You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/10 10:56:51 UTC

[GitHub] [spark] ulysses-you commented on a diff in pull request #36438: [SPARK-39092][SQL] Propagate Empty Partitions

ulysses-you commented on code in PR #36438:
URL: https://github.com/apache/spark/pull/36438#discussion_r869099789


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -117,6 +118,112 @@ object ShufflePartitionsUtil extends Logging {
       return Seq.empty
     }
 
+    val emptyIndexSet = collection.mutable.Set.empty[Int]
+    inputPartitionSpecs.foreach(_.get.iterator.zipWithIndex.foreach {
+      case (EmptyPartitionSpec, i) => emptyIndexSet.add(i)
+      case _ =>
+    })
+
+    if (emptyIndexSet.isEmpty) {
+      return coalescePartitionsWithSkew(mapOutputStatistics, inputPartitionSpecs,
+        targetSize, minPartitionSize, true)
+    }
+
+    // when all partitions are empty, return single EmptyPartitionSpec here to satisfy
+    // SPARK-32083 (AQE coalesce should at least return one partition).
+    if (inputPartitionSpecs.flatten.flatten.forall(_ == EmptyPartitionSpec)) {
+      return inputPartitionSpecs.map(_ => Seq(EmptyPartitionSpec))
+    }
+
+    // ShufflePartitionSpecs at these emptyIndices can NOT be coalesced
+    // split inputPartitionSpecs into sub-sequences by the empty indices, and
+    // call coalescePartitionsWithSkew to optimize each sub-sequence.
+    // let inputPartitionSpecs are:
+    //   [A0(empty), A1, A2, A3(empty), A4(empty), A5, A6, A7, A8, A9, A10]
+    //   [B0, B1, B2, B3, B4(empty), B5, B6, B7, B8(empty), B9, B10]
+    // then:
+    // 1, specs at index (0, 3, 8) are kept: (A0(empty)-B0), (A3(empty)-B3), (A8-B8(empty))
+    // 2, specs at index 4 are discarded, since they are all empty: (A4(empty)-B4(empty))
+    // 3, sub-sequences [A1-B1, A2-B2], [A5-B5, A6-B6, A7-B7], [A9-B9, A10-B10] are optimized

Review Comment:
   Would this be a regression ?
   
   If the advisoryPartitionSizeInBytes size is 60 and the two sides of 4 partition size are:
   [10, 10, 0, 10]
   [10, 0, 10, 10]
   
   The `CoalesceShufflePartitions` will return one reduce task. But if we mark it as empty, we will get three reduce tasks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org