You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/01 18:44:35 UTC

[GitHub] [spark] sunchao commented on a change in pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

sunchao commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r760476421



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,13 +70,63 @@ case class EnsureRequirements(
     val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
       case (UnspecifiedDistribution, _) => false
       case (_: BroadcastDistribution, _) => false
+      case (AllTuples, _) => false
       case _ => true
     }.map(_._2)
 
-    val childrenNumPartitions =
-      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+    // If there are more than one children, we'll need to check partitioning & distribution of them
+    // and see if extra shuffles are necessary.
+    if (childrenIndexes.length > 1) {
+      childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+        if (!d.isInstanceOf[ClusteredDistribution]) {
+          throw new IllegalStateException(s"Expected ClusteredDistribution but found " +
+              s"${d.getClass.getSimpleName}")
+        }
+      }
+      val specs = childrenIndexes.map(i =>
+        i -> children(i).outputPartitioning.createShuffleSpec(
+          requiredChildDistributions(i).requiredNumPartitions.getOrElse(conf.numShufflePartitions),
+          requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+      ).toMap
+
+      // Find out the shuffle spec that gives better parallelism.
+      //
+      // NOTE: this is not optimal for the case when there are more than 2 children. Consider:
+      //   (10, 10, 11)
+      // it's better to pick 10 in this case since we only need to shuffle one side - we'd need to
+      // shuffle two sides if we pick 11.
+      //
+      // However this should be sufficient for now since in Spark nodes with multiple children
+      // always have exactly 2 children.
+      //
+      // Also when choosing the spec, we should consider those children with no `Exchange` node
+      // first. For instance, if we have:
+      //   A: (No_Exchange, 100) <---> B: (Exchange, 120)
+      // it's better to pick A and change B to (Exchange, 100) instead of picking B and insert a
+      // new shuffle for A.

Review comment:
       Updated the PR, please share your feedback on the new approach @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org