You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/04/13 14:27:23 UTC
spark git commit: [SPARK-20233][SQL] Apply star-join filter
heuristics to dynamic programming join enumeration
Repository: spark
Updated Branches:
refs/heads/master a4293c284 -> fbe4216e1
[SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration
## What changes were proposed in this pull request?
Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:
```
T1 D1 - T2 - T3
\ /
F1
|
D2
star-join: {F1, D1, D2}
non-star: {T1, T2, T3}
```
The following join combinations will be generated:
```
level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}
```
## How was this patch tested?
New test suite ```StarJOinCostBasedReorderSuite.scala```.
Author: Ioana Delaney <io...@gmail.com>
Closes #17546 from ioana-delaney/starSchemaCBOv3.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe4216e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe4216e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe4216e
Branch: refs/heads/master
Commit: fbe4216e1e83d243a7f0521b76bfb20c25278281
Parents: a4293c2
Author: Ioana Delaney <io...@gmail.com>
Authored: Thu Apr 13 22:27:04 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Apr 13 22:27:04 2017 +0800
----------------------------------------------------------------------
.../optimizer/CostBasedJoinReorder.scala | 144 ++++++-
.../optimizer/StarSchemaDetection.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 8 +
.../StarJoinCostBasedReorderSuite.scala | 426 +++++++++++++++++++
4 files changed, 571 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index cbd5064..c704c2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
- // TODO: Compute the set of star-joins and use them in the join enumeration
- // algorithm to prune un-optimal plan choices.
val result =
// Do reordering if the number of items is appropriate and join conditions exist.
// We also need to check if costs of all items can be evaluated.
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
}.toMap)
+ // Build filters from the join graph to be used by the search algorithm.
+ val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
+
// Build plans for next levels until the last level has only one plan. This plan contains
// all items that can be joined, so there's no need to continue.
val topOutputSet = AttributeSet(output)
- while (foundPlans.size < items.length && foundPlans.last.size > 1) {
+ while (foundPlans.size < items.length) {
// Build plans for the next level.
- foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
+ foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
}
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
@@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging {
existingLevels: Seq[JoinPlanMap],
conf: SQLConf,
conditions: Set[Expression],
- topOutput: AttributeSet): JoinPlanMap = {
+ topOutput: AttributeSet,
+ filters: Option[JoinGraphInfo]): JoinPlanMap = {
val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
var k = 0
@@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
}
otherSideCandidates.foreach { otherSidePlan =>
- buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
+ buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match {
case Some(newJoinPlan) =>
// Check if it's the first plan for the item set, or it's a better plan than
// the existing one due to lower cost.
@@ -218,14 +220,20 @@ object JoinReorderDP extends PredicateHelper with Logging {
}
/**
- * Builds a new JoinPlan when both conditions hold:
+ * Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from both sides.
+ * - if star-join filter is enabled, allow the following combinations:
+ * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+ * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+ * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+ *
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
+ * @param filters Join graph info to be used as filters by the search algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
*/
private def buildJoin(
@@ -233,13 +241,27 @@ object JoinReorderDP extends PredicateHelper with Logging {
otherJoinPlan: JoinPlan,
conf: SQLConf,
conditions: Set[Expression],
- topOutput: AttributeSet): Option[JoinPlan] = {
+ topOutput: AttributeSet,
+ filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
// Should not join two overlapping item sets.
return None
}
+ if (filters.isDefined) {
+ // Apply star-join filter, which ensures that tables in a star schema relationship
+ // are planned together. The star-filter will eliminate joins among star and non-star
+ // tables until the star joins are built. The following combinations are allowed:
+ // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+ // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+ // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+ val isValidJoinCombination =
+ JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
+ filters.get)
+ if (!isValidJoinCombination) return None
+ }
+
val onePlan = oneJoinPlan.plan
val otherPlan = otherJoinPlan.plan
val joinConds = conditions
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging {
case class Cost(card: BigInt, size: BigInt) {
def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}
+
+/**
+ * Implements optional filters to reduce the search space for join enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ * to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ * large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
+ * intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+object JoinReorderDPFilters extends PredicateHelper {
+ /**
+ * Builds join graph information to be used by the filtering strategies.
+ * Currently, it builds the sets of star/non-star joins.
+ * It can be extended with the sets of connected/unconnected joins, which
+ * can be used to filter Cartesian products.
+ */
+ def buildJoinGraphInfo(
+ conf: SQLConf,
+ items: Seq[LogicalPlan],
+ conditions: Set[Expression],
+ itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+ if (conf.joinReorderDPStarFilter) {
+ // Compute the tables in a star-schema relationship.
+ val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
+ val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+ if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+ val itemMap = itemIndex.toMap
+ Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
+ } else {
+ // Nothing interesting to return.
+ None
+ }
+ } else {
+ // Star schema filter is not enabled.
+ None
+ }
+ }
+
+ /**
+ * Applies the star-join filter that eliminates join combinations among star
+ * and non-star tables until the star join is built.
+ *
+ * Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan
+ * permutations generated by the DP join enumeration, and the star/non-star plans,
+ * the following plan combinations are allowed:
+ * 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join
+ * 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan)
+ * 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join
+ *
+ * It assumes the sets are disjoint.
+ *
+ * Example query graph:
+ *
+ * t1 d1 - t2 - t3
+ * \ /
+ * f1
+ * |
+ * d2
+ *
+ * star: {d1, f1, d2}
+ * non-star: {t2, t1, t3}
+ *
+ * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+ * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+ * level 2: {d2 f1 d1 }
+ * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+ * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+ * level 5: {d1 t3 t2 f1 t1 d2 }
+ *
+ * @param oneSideJoinPlan One side of the join represented as a set of plan ids.
+ * @param otherSideJoinPlan The other side of the join represented as a set of plan ids.
+ * @param filters Star and non-star plans represented as sets of plan ids
+ */
+ def starJoinFilter(
+ oneSideJoinPlan: Set[Int],
+ otherSideJoinPlan: Set[Int],
+ filters: JoinGraphInfo) : Boolean = {
+ val starJoins = filters.starJoins
+ val nonStarJoins = filters.nonStarJoins
+ val join = oneSideJoinPlan.union(otherSideJoinPlan)
+
+ // Disjoint sets
+ oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty &&
+ // Either star or non-star is empty
+ (starJoins.isEmpty || nonStarJoins.isEmpty ||
+ // Join is a subset of the star-join
+ join.subsetOf(starJoins) ||
+ // Star-join is a subset of join
+ starJoins.subsetOf(join) ||
+ // Join is a subset of non-star
+ join.subsetOf(nonStarJoins))
+ }
+}
+
+/**
+ * Helper class that keeps information about the join graph as sets of item/plan ids.
+ * It currently stores the star/non-star plans. It can be
+ * extended with the set of connected/unconnected plans.
+ */
+case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])
http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 91cb004..97ee998 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {
val emptyStarJoinPlan = Seq.empty[LogicalPlan]
- if (!conf.starSchemaDetection || input.size < 2) {
+ if (input.size < 2) {
emptyStarJoinPlan
} else {
// Find if the input plans are eligible for star join detection.
http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6b0f495..2e1798e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -736,6 +736,12 @@ object SQLConf {
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.createWithDefault(0.7)
+ val JOIN_REORDER_DP_STAR_FILTER =
+ buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+ .doc("Applies star-join filter heuristics to cost based join enumeration.")
+ .booleanConf
+ .createWithDefault(false)
+
val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
.doc("When true, it enables join reordering based on star schema detection. ")
.booleanConf
@@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging {
def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)
+ def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
+
def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
def sortMergeJoinExecBufferSpillThreshold: Int =
http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
new file mode 100644
index 0000000..a23d626
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase {
+
+ override val conf = new SQLConf().copy(
+ CBO_ENABLED -> true,
+ JOIN_REORDER_ENABLED -> true,
+ JOIN_REORDER_DP_STAR_FILTER -> true)
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Operator Optimizations", FixedPoint(100),
+ CombineFilters,
+ PushDownPredicate,
+ ReorderJoin(conf),
+ PushPredicateThroughJoin,
+ ColumnPruning,
+ CollapseProject) ::
+ Batch("Join Reorder", Once,
+ CostBasedJoinReorder(conf)) :: Nil
+ }
+
+ private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+ // F1 (fact table)
+ attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+
+ // D1 (dimension)
+ attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+
+ // D2 (dimension)
+ attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+
+ // D3 (dimension)
+ attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 0, avgLen = 4, maxLen = 4),
+
+ // T1 (regular table i.e. outside star)
+ attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+
+ // T2 (regular table)
+ attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+
+ // T3 (regular table)
+ attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+
+ // T4 (regular table)
+ attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+
+ // T5 (regular table)
+ attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+
+ // T6 (regular table)
+ attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+ nullCount = 1, avgLen = 4, maxLen = 4)
+
+ ))
+
+ private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
+ private val nameToColInfo: Map[String, (Attribute, ColumnStat)] =
+ columnInfo.map(kv => kv._1.name -> kv)
+
+ private val f1 = StatsTestPlan(
+ outputList = Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2").map(nameToAttr),
+ rowCount = 1000,
+ size = Some(1000 * (8 + 4 * 5)),
+ attributeStats = AttributeMap(Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2")
+ .map(nameToColInfo)))
+
+ // To control the layout of the join plans, keep the size for the non-fact tables constant
+ // and vary the rowcount and the number of distinct values of the join columns.
+ private val d1 = StatsTestPlan(
+ outputList = Seq("d1_pk", "d1_c2", "d1_c3").map(nameToAttr),
+ rowCount = 100,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("d1_pk", "d1_c2", "d1_c3").map(nameToColInfo)))
+
+ private val d2 = StatsTestPlan(
+ outputList = Seq("d2_pk", "d2_c2", "d2_c3").map(nameToAttr),
+ rowCount = 20,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("d2_pk", "d2_c2", "d2_c3").map(nameToColInfo)))
+
+ private val d3 = StatsTestPlan(
+ outputList = Seq("d3_pk", "d3_c2", "d3_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("d3_pk", "d3_c2", "d3_c3").map(nameToColInfo)))
+
+ private val t1 = StatsTestPlan(
+ outputList = Seq("t1_c1", "t1_c2", "t1_c3").map(nameToAttr),
+ rowCount = 50,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t1_c1", "t1_c2", "t1_c3").map(nameToColInfo)))
+
+ private val t2 = StatsTestPlan(
+ outputList = Seq("t2_c1", "t2_c2", "t2_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t2_c1", "t2_c2", "t2_c3").map(nameToColInfo)))
+
+ private val t3 = StatsTestPlan(
+ outputList = Seq("t3_c1", "t3_c2", "t3_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t3_c1", "t3_c2", "t3_c3").map(nameToColInfo)))
+
+ private val t4 = StatsTestPlan(
+ outputList = Seq("t4_c1", "t4_c2", "t4_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t4_c1", "t4_c2", "t4_c3").map(nameToColInfo)))
+
+ private val t5 = StatsTestPlan(
+ outputList = Seq("t5_c1", "t5_c2", "t5_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t5_c1", "t5_c2", "t5_c3").map(nameToColInfo)))
+
+ private val t6 = StatsTestPlan(
+ outputList = Seq("t6_c1", "t6_c2", "t6_c3").map(nameToAttr),
+ rowCount = 10,
+ size = Some(3000),
+ attributeStats = AttributeMap(Seq("t6_c1", "t6_c2", "t6_c3").map(nameToColInfo)))
+
+ test("Test 1: Star query with two dimensions and two regular tables") {
+
+ // d1 t1
+ // \ /
+ // f1
+ // / \
+ // d2 t2
+ //
+ // star: {f1, d1, d2}
+ // non-star: {t1, t2}
+ //
+ // level 0: (t2 ), (d2 ), (f1 ), (d1 ), (t1 )
+ // level 1: {f1 d1 }, {d2 f1 }
+ // level 2: {d2 f1 d1 }
+ // level 3: {t2 d1 d2 f1 }, {t1 d1 d2 f1 }
+ // level 4: {f1 t1 t2 d1 d2 }
+ //
+ // Number of generated plans: 11 (vs. 20 w/o filter)
+ val query =
+ f1.join(t1).join(t2).join(d1).join(d2)
+ .where((nameToAttr("f1_c1") === nameToAttr("t1_c1")) &&
+ (nameToAttr("f1_c2") === nameToAttr("t2_c1")) &&
+ (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+
+ val expected =
+ f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .join(t2, Inner, Some(nameToAttr("f1_c2") === nameToAttr("t2_c1")))
+ .join(t1, Inner, Some(nameToAttr("f1_c1") === nameToAttr("t1_c1")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ test("Test 2: Star with a linear branch") {
+ //
+ // t1 d1 - t2 - t3
+ // \ /
+ // f1
+ // |
+ // d2
+ //
+ // star: {d1, f1, d2}
+ // non-star: {t2, t1, t3}
+ //
+ // level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+ // level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+ // level 2: {d2 f1 d1 }
+ // level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+ // level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+ // level 5: {d1 t3 t2 f1 t1 d2 }
+ //
+ // Number of generated plans: 15 (vs 24)
+ val query =
+ d1.join(t1).join(t2).join(f1).join(d2).join(t3)
+ .where((nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
+ (nameToAttr("t1_c1") === nameToAttr("f1_c1")) &&
+ (nameToAttr("d2_pk") === nameToAttr("f1_fk2")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+ (nameToAttr("d1_c2") === nameToAttr("t2_c1")) &&
+ (nameToAttr("t2_c2") === nameToAttr("t3_c1")))
+
+ val expected =
+ f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .join(t3.join(t2, Inner, Some(nameToAttr("t2_c2") === nameToAttr("t3_c1"))), Inner,
+ Some(nameToAttr("d1_c2") === nameToAttr("t2_c1")))
+ .join(t1, Inner, Some(nameToAttr("t1_c1") === nameToAttr("f1_c1")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ test("Test 3: Star with derived branches") {
+ // t3 t2
+ // | |
+ // d1 - t4 - t1
+ // |
+ // f1
+ // |
+ // d2
+ //
+ // star: (d1 f1 d2 )
+ // non-star: (t4 t1 t2 t3 )
+ //
+ // level 0: (t1 ), (t3 ), (f1 ), (d1 ), (t2 ), (d2 ), (t4 )
+ // level 1: {f1 d2 }, {t1 t4 }, {t1 t2 }, {f1 d1 }, {t3 t4 }
+ // level 2: {d1 f1 d2 }, {t2 t1 t4 }, {t1 t3 t4 }
+ // level 3: {t4 d1 f1 d2 }, {t3 t4 t1 t2 }
+ // level 4: {d1 f1 t4 d2 t3 }, {d1 f1 t4 d2 t1 }
+ // level 5: {d1 f1 t4 d2 t1 t2 }, {d1 f1 t4 d2 t1 t3 }
+ // level 6: {d1 f1 t4 d2 t1 t2 t3 }
+ //
+ // Number of generated plans: 22 (vs. 34)
+ val query =
+ d1.join(t1).join(t2).join(t3).join(t4).join(f1).join(d2)
+ .where((nameToAttr("t1_c1") === nameToAttr("t2_c1")) &&
+ (nameToAttr("t3_c1") === nameToAttr("t4_c1")) &&
+ (nameToAttr("t1_c2") === nameToAttr("t4_c2")) &&
+ (nameToAttr("d1_c2") === nameToAttr("t4_c3")) &&
+ (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+
+ val expected =
+ f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
+ Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
+ .join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
+ Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ test("Test 4: Star with several branches") {
+ //
+ // d1 - t3 - t4
+ // |
+ // f1 - d3 - t1 - t2
+ // |
+ // d2 - t5 - t6
+ //
+ // star: {d1 f1 d2 d3 }
+ // non-star: {t5 t3 t6 t2 t4 t1}
+ //
+ // level 0: (t4 ), (d2 ), (t5 ), (d3 ), (d1 ), (f1 ), (t2 ), (t6 ), (t1 ), (t3 )
+ // level 1: {t5 t6 }, {t4 t3 }, {d3 f1 }, {t2 t1 }, {d2 f1 }, {d1 f1 }
+ // level 2: {d2 d1 f1 }, {d2 d3 f1 }, {d3 d1 f1 }
+ // level 3: {d2 d1 d3 f1 }
+ // level 4: {d1 t3 d3 f1 d2 }, {d1 d3 f1 t1 d2 }, {d1 t5 d3 f1 d2 }
+ // level 5: {d1 t5 d3 f1 t1 d2 }, {d1 t3 t4 d3 f1 d2 }, {d1 t5 t6 d3 f1 d2 },
+ // {d1 t5 t3 d3 f1 d2 }, {d1 t3 d3 f1 t1 d2 }, {d1 t2 d3 f1 t1 d2 }
+ // level 6: {d1 t5 t3 t4 d3 f1 d2 }, {d1 t3 t2 d3 f1 t1 d2 }, {d1 t5 t6 d3 f1 t1 d2 },
+ // {d1 t5 t3 d3 f1 t1 d2 }, {d1 t5 t2 d3 f1 t1 d2 }, ...
+ // ...
+ // level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 }
+ //
+ // Number of generated plans: 46 (vs. 82)
+ val query =
+ d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2)
+ .where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) &&
+ (nameToAttr("t3_c2") === nameToAttr("t4_c2")) &&
+ (nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+ (nameToAttr("d2_c2") === nameToAttr("t5_c1")) &&
+ (nameToAttr("t5_c2") === nameToAttr("t6_c2")) &&
+ (nameToAttr("f1_fk3") === nameToAttr("d3_pk")) &&
+ (nameToAttr("d3_c2") === nameToAttr("t1_c1")) &&
+ (nameToAttr("t1_c2") === nameToAttr("t2_c2")))
+
+ val expected =
+ f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(t4.join(t3, Inner, Some(nameToAttr("t3_c2") === nameToAttr("t4_c2"))), Inner,
+ Some(nameToAttr("d1_c2") === nameToAttr("t3_c1")))
+ .join(t2.join(t1, Inner, Some(nameToAttr("t1_c2") === nameToAttr("t2_c2"))), Inner,
+ Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
+ .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner,
+ Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ test("Test 5: RI star only") {
+ // d1
+ // |
+ // f1
+ // / \
+ // d2 d3
+ //
+ // star: {f1, d1, d2, d3}
+ // non-star: {}
+ // level 0: (d1), (f1), (d2), (d3)
+ // level 1: {f1 d3 }, {f1 d2 }, {d1 f1 }
+ // level 2: {d1 f1 d2 }, {d2 f1 d3 }, {d1 f1 d3 }
+ // level 3: {d1 d2 f1 d3 }
+ // Number of generated plans: 11 (= 11)
+ val query =
+ d1.join(d2).join(f1).join(d3)
+ .where((nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+ (nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+
+ val expected =
+ f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+ .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ test("Test 6: No RI star") {
+ //
+ // f1 - t1 - t2 - t3
+ //
+ // star: {}
+ // non-star: {f1, t1, t2, t3}
+ // level 0: (t1), (f1), (t2), (t3)
+ // level 1: {f1 t3 }, {f1 t2 }, {t1 f1 }
+ // level 2: {t1 f1 t2 }, {t2 f1 t3 }, {dt f1 t3 }
+ // level 3: {t1 t2 f1 t3 }
+ // Number of generated plans: 11 (= 11)
+ val query =
+ t1.join(f1).join(t2).join(t3)
+ .where((nameToAttr("f1_fk1") === nameToAttr("t1_c1")) &&
+ (nameToAttr("f1_fk2") === nameToAttr("t2_c1")) &&
+ (nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
+
+ val expected =
+ f1.join(t3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
+ .join(t2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("t2_c1")))
+ .join(t1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("t1_c1")))
+
+ assertEqualPlans(query, expected)
+ }
+
+ private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
+ val optimized = Optimize.execute(plan1.analyze)
+ val expected = plan2.analyze
+ compareJoinOrder(optimized, expected)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org