You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/17 06:14:14 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #27893: [SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced

Ngone51 commented on a change in pull request #27893: [SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced
URL: https://github.com/apache/spark/pull/27893#discussion_r393463110
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -150,146 +155,97 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
    */
   def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
     case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _),
-        s2 @ SortExec(_, _, right: ShuffleQueryStageExec, _), _)
+        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
+        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
         if supportedJoinTypes.contains(joinType) =>
-      val leftStats = getStatistics(left)
-      val rightStats = getStatistics(right)
-      val numPartitions = leftStats.bytesByPartitionId.length
-
-      val leftMedSize = medianSize(leftStats)
-      val rightMedSize = medianSize(rightStats)
+      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+      val numPartitions = left.partitionsWithSizes.length
+      // We use the median size of the original shuffle partitions to detect skewed partitions.
+      val leftMedSize = medianSize(left.mapStats)
+      val rightMedSize = medianSize(right.mapStats)
       logDebug(
         s"""
-          |Try to optimize skewed join.
-          |Left side partition size:
-          |${getSizeInfo(leftMedSize, leftStats.bytesByPartitionId.max)}
-          |Right side partition size:
-          |${getSizeInfo(rightMedSize, rightStats.bytesByPartitionId.max)}
+          |Optimizing skewed join.
+          |Left side partitions size info:
+          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+          |Right side partitions size info:
+          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
         """.stripMargin)
       val canSplitLeft = canSplitLeftSide(joinType)
       val canSplitRight = canSplitRightSide(joinType)
-      val leftTargetSize = targetSize(leftStats, leftMedSize)
-      val rightTargetSize = targetSize(rightStats, rightMedSize)
+      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+      // the final data distribution is even (coalesced partitions + split partitions).
+      val leftActualSizes = left.partitionsWithSizes.map(_._2)
+      val rightActualSizes = right.partitionsWithSizes.map(_._2)
+      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
 
       val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
       val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      // This is used to delay the creation of non-skew partitions so that we can potentially
-      // coalesce them like `CoalesceShufflePartitions` does.
-      val nonSkewPartitionIndices = mutable.ArrayBuffer.empty[Int]
       val leftSkewDesc = new SkewDesc
       val rightSkewDesc = new SkewDesc
       for (partitionIndex <- 0 until numPartitions) {
-        val leftSize = leftStats.bytesByPartitionId(partitionIndex)
-        val isLeftSkew = isSkewed(leftSize, leftMedSize) && canSplitLeft
-        val rightSize = rightStats.bytesByPartitionId(partitionIndex)
-        val isRightSkew = isSkewed(rightSize, rightMedSize) && canSplitRight
-        if (isLeftSkew || isRightSkew) {
-          if (nonSkewPartitionIndices.nonEmpty) {
-            // As soon as we see a skew, we'll "flush" out unhandled non-skew partitions.
-            createNonSkewPartitions(leftStats, rightStats, nonSkewPartitionIndices).foreach { p =>
-              leftSidePartitions += p
-              rightSidePartitions += p
-            }
-            nonSkewPartitionIndices.clear()
-          }
-
-          val leftParts = if (isLeftSkew) {
-            val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize)
-            if (mapStartIndices.length > 1) {
-              leftSkewDesc.addPartitionSize(leftSize)
-              createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(left))
-            } else {
-              Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
-            }
-          } else {
-            Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
-          }
-
-          val rightParts = if (isRightSkew) {
-            val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize)
-            if (mapStartIndices.length > 1) {
-              rightSkewDesc.addPartitionSize(rightSize)
-              createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(right))
-            } else {
-              Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
-            }
-          } else {
-            Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
+        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
+        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
+        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+        // A skewed partition should never be coalesced, but skip it here just to be safe.
 
 Review comment:
   Oh yeah, I checked, you're right!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org