You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/25 16:58:25 UTC

[GitHub] [spark] tanelk commented on a change in pull request #29871: [SPARK-32995][SQL] CostBasedJoinReorder optimizer rule should be idempotent

tanelk commented on a change in pull request #29871:
URL: https://github.com/apache/spark/pull/29871#discussion_r495116397



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
##########
@@ -40,30 +40,35 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
     if (!conf.cboEnabled || !conf.joinReorderEnabled) {
       plan
     } else {
-      val result = plan transformDown {
-        // Start reordering with a joinable item, which is an InnerLike join with conditions.
-        // Avoid reordering if a join hint is present.
-        case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
-          reorder(j, j.output)
-        case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE))
-          if projectList.forall(_.isInstanceOf[Attribute]) =>
-          reorder(p, p.output)
-      }
+      val result = plan transformDown applyLocally
       // After reordering is finished, convert OrderedJoin back to Join.
       result transform {
         case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond, JoinHint.NONE)
       }
     }
   }
 
+  private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Start reordering with a joinable item, which is an InnerLike join with conditions.
+    // Avoid reordering if a join hint is present.
+    case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
+      reorder(j, j.output)
+    case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE))
+      if projectList.forall(_.isInstanceOf[Attribute]) =>
+      reorder(p, p.output)
+
+  }
+
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
     val (items, conditions) = extractInnerJoins(plan)
     val result =
       // Do reordering if the number of items is appropriate and join conditions exist.
       // We also need to check if costs of all items can be evaluated.
       if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && conditions.nonEmpty &&
           items.forall(_.stats.rowCount.isDefined)) {
-        JoinReorderDP.search(conf, items, conditions, output)
+        // Transform and sort all the items to guarantee idempotence
+        val transformedItems = items.map(_ transformDown applyLocally).sortBy(_.semanticHash())
+        JoinReorderDP.search(conf, transformedItems, conditions, output)

Review comment:
       This is the most relevant change - previously we reordered the joins from top to bottom. Now we reorder the joins in the lower groups first and then reorder the ones on the top.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org