You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/19 05:33:21 UTC

[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r731503157



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted

Review comment:
       Given the cost, compute this only if required ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }
+    val skewSizeThreshold =
+      medianSize * Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)

Review comment:
       Enable this only if explicitly configured ? So that we preserve behavior and see what the impact would be.
   We can make it a default in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org