You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by nsyca <gi...@git.apache.org> on 2017/03/13 19:19:21 UTC

[GitHub] spark pull request #17240: [SPARK-19915][SQL] Improve join reorder: simplify...

Github user nsyca commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17240#discussion_r105748347
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala ---
    @@ -204,63 +206,37 @@ object JoinReorderDP extends PredicateHelper {
           oneJoinPlan: JoinPlan,
           otherJoinPlan: JoinPlan,
           conf: CatalystConf,
    -      conditions: Set[Expression],
    -      topOutput: AttributeSet): JoinPlan = {
    +      conditions: Set[Expression]): Option[JoinPlan] = {
     
         val onePlan = oneJoinPlan.plan
         val otherPlan = otherJoinPlan.plan
    -    // Now both onePlan and otherPlan become intermediate joins, so the cost of the
    -    // new join should also include their own cardinalities and sizes.
    -    val newCost = if (isCartesianProduct(onePlan) || isCartesianProduct(otherPlan)) {
    -      // We consider cartesian product very expensive, thus set a very large cost for it.
    -      // This enables to plan all the cartesian products at the end, because having a cartesian
    -      // product as an intermediate join will significantly increase a plan's cost, making it
    -      // impossible to be selected as the best plan for the items, unless there's no other choice.
    -      Cost(
    -        rows = BigInt(Long.MaxValue) * BigInt(Long.MaxValue),
    -        size = BigInt(Long.MaxValue) * BigInt(Long.MaxValue))
    -    } else {
    -      val onePlanStats = onePlan.stats(conf)
    -      val otherPlanStats = otherPlan.stats(conf)
    -      Cost(
    -        rows = oneJoinPlan.cost.rows + onePlanStats.rowCount.get +
    -          otherJoinPlan.cost.rows + otherPlanStats.rowCount.get,
    -        size = oneJoinPlan.cost.size + onePlanStats.sizeInBytes +
    -          otherJoinPlan.cost.size + otherPlanStats.sizeInBytes)
    -    }
    -
    -    // Put the deeper side on the left, tend to build a left-deep tree.
    -    val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
    -      (onePlan, otherPlan)
    -    } else {
    -      (otherPlan, onePlan)
    -    }
         val joinConds = conditions
           .filterNot(l => canEvaluate(l, onePlan))
           .filterNot(r => canEvaluate(r, otherPlan))
           .filter(e => e.references.subsetOf(onePlan.outputSet ++ otherPlan.outputSet))
    -    // We use inner join whether join condition is empty or not. Since cross join is
    -    // equivalent to inner join without condition.
    -    val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
    -    val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
    -    val remainingConds = conditions -- collectedJoinConds
    -    val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
    -    val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
    -    val newPlan =
    -      if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
    -        Project(neededFromNewJoin.toSeq, newJoin)
    +    if (joinConds.isEmpty) {
    +      // Cartesian product is very expensive, so we exclude them from candidate plans.
    +      // This also helps us to reduce the search space. Unjoinable items will be put at the end
    +      // of the plan when the reordering phase finishes.
    +      None
    +    } else {
    +      // Put the deeper side on the left, tend to build a left-deep tree.
    +      val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
    +        (onePlan, otherPlan)
           } else {
    -        newJoin
    +        (otherPlan, onePlan)
           }
    +      val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
    +      val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
     
    -    val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
    -    JoinPlan(itemIds, newPlan, collectedJoinConds, newCost)
    -  }
    +      // Now onePlan/otherPlan becomes an intermediate join (if it's a non-leaf item),
    +      // so the cost of the new join should also include their own cardinalities.
    +      val newCost = oneJoinPlan.cost + otherJoinPlan.cost +
    +        (if (oneJoinPlan.itemIds.size > 1) onePlan.stats(conf).rowCount.get else 0) +
    +        (if (otherJoinPlan.itemIds.size > 1) otherPlan.stats(conf).rowCount.get else 0)
    --- End diff --
    
    This may not a problem of this PR but the way the rowCount does not take the filtering of any local predicate into account, it will make the outcome of this join reorder less reliable.
    
    For example:
    `select * from t1, t2, t3 where t1.c1 = t2.c1 and t1.c2 = t3.c2`
    and
    `select * from t1, t2, t3 where t1.c1 = t2.c1 and t1.c2 = t3.c2 and t2.c2=1`
    
    The rowCount from relation T2 in both cases to this join reordering code are equal. With the presence of the predicate `t2.c2=1`, the cardinality of the input to the join from T2 could be substantially smaller.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org