You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/10 02:28:45 UTC

[GitHub] [spark] wankunde opened a new pull request #34234: [SPARK-36967]Report accurate block size threshold per reduce task

wankunde opened a new pull request #34234:
URL: https://github.com/apache/spark/pull/34234


   ### What changes were proposed in this pull request?
   
   If a shuffle block size is bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD_PER_REDUCE / number of map task, it will be accurate report.
   
   ### Why are the changes needed?
   
   Now map task will report accurate shuffle block size if the block size is greater than "spark.shuffle.accurateBlockThreshold"( 100M by default ). But if there are a large number of map tasks and the shuffle block sizes of these tasks are smaller than "spark.shuffle.accurateBlockThreshold", there may be unrecognized data skew.
   
   For example, there are 10000 map task and 10000 reduce task, and each map task create 50M shuffle blocks for reduce 0, and 10K shuffle blocks for the left reduce tasks, reduce 0 is data skew, but the stat of this plan do not have this information.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   
   Update exists UTs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r731503157



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted

Review comment:
       Given the cost, compute this only if required ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }
+    val skewSizeThreshold =
+      medianSize * Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)

Review comment:
       Enable this only if explicitly configured ? So that we preserve behavior and see what the impact would be.
   We can make it a default in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950800170


   **[Test build #144581 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144581/testReport)** for PR 34234 at commit [`8fa9e86`](https://github.com/apache/spark/commit/8fa9e86308ec6e66929a30c191cacfcafe2073a6).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968859301


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49699/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956071825


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49275/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955997664


   **[Test build #144805 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144805/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950631543


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144577/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r812573760



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val shuffleAccurateBlockThreshold =
+      Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)

Review comment:
       it seems we sort `uncompressedSizes` twice.  The `sortedSizes` is sorted but the `Utils.median` sort it again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r812589645



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val shuffleAccurateBlockThreshold =
+      Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)

Review comment:
       @wankunde looks good , thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956028824


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49275/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968859301


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49699/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950550704


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r734948812



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted

Review comment:
       Let us avoid unnecessary allocations and cost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968906734


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/145229/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955997664


   **[Test build #144805 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144805/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-939396991


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950514370


   Ok to test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735359318



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "-1 to disable this feature by default.")
+      .version("3.3.0")
+      .intConf

Review comment:
       Updated this conf to `doubelConf`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950621196


   **[Test build #144577 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144577/testReport)** for PR 34234 at commit [`4068f58`](https://github.com/apache/spark/commit/4068f580de9b78d7679920945168b99ea8f6ce09).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735226478



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }
+    val skewSizeThreshold =
+      medianSize * Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)

Review comment:
       Thanks @mridulm , I just updated code, and disable this behavior by default. Could you help me to review again ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950752682


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49052/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros closed pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
attilapiros closed pull request #34234:
URL: https://github.com/apache/spark/pull/34234


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE]Report accurate block size threshold per reduce task

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-939396991


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
attilapiros commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955992340


   Jenkins retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955320457


   **[Test build #144784 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144784/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955320457


   **[Test build #144784 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144784/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r739661964



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val shuffleAccurateBlockThreshold =
+      Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)

Review comment:
       Updated code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950587581


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950645992


   **[Test build #144581 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144581/testReport)** for PR 34234 at commit [`8fa9e86`](https://github.com/apache/spark/commit/8fa9e86308ec6e66929a30c191cacfcafe2073a6).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r733679745



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted

Review comment:
       I don't think  the sort of thousands of number will cost too much ? Am i right ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }
+    val skewSizeThreshold =
+      medianSize * Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)

Review comment:
       There are many long running skewed jobs in our cluster, but the driver does not recognize the skewed tasks.
   For example, the job in the PR description.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-976278207


   Hi, @Ngone51 @JoshRosen @attilapiros 
   Could you help me to review this PR?
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r739396666



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val shuffleAccurateBlockThreshold =
+      Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)

Review comment:
       Nit: `: Long` is not needed but as `medianSize` is only used once I would remove this val and use the expression at the calculation.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "Set to -1.0 to disable this feature by default.")
+      .version("3.3.0")
+      .doubleConf
+      .createWithDefault(-1.0)
+
+  private[spark] val SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER =
+    ConfigBuilder("spark.shuffle.maxAccurateSkewedBlockNumber")
+      .doc("Max skewed shuffle blocks allowed to be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +

Review comment:
       this factor => SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR 

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350 * 1024).toLong)
+    val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until 3000) {
+      val estimate = status1.getSizeForBlock(i)
+      if (i < 2990) {
+        assert(estimate === avg)
+      } else {
+        assert(estimate === compressAndDecompressSize(sizes(i)))
+      }

Review comment:
       Nit: I think it is easier to follow this way:
   
   ```scala
       val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
       val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
       val sizes = smallBlockSizes ++: skewBlocksSizes 
       val avg = smallBlockSizes.sum / smallBlockSizes.length
       val loc = BlockManagerId("a", "b", 10)
       val mapTaskAttemptId = 5
       val status = MapStatus(loc, sizes, mapTaskAttemptId)
       val status1 = compressAndDecompressMapStatus(status)
       assert(status1.isInstanceOf[HighlyCompressedMapStatus])
       assert(status1.location == loc)
       assert(status1.mapId == mapTaskAttemptId)
       assert(status1.getSizeForBlock(0) == 0)
       for (i <- 1 until smallBlockSizes.length) {
         assert(status1.getSizeForBlock(i) === avg)
       }
       for (i <- 0 until skewBlocksSizes.length) {
         assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
           compressAndDecompressSize(skewBlocksSizes(i)))
       }
   ```

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350 * 1024).toLong)
+    val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until 3000) {
+      val estimate = status1.getSizeForBlock(i)
+      if (i < 2990) {
+        assert(estimate === avg)
+      } else {
+        assert(estimate === compressAndDecompressSize(sizes(i)))
+      }
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2500) i else i + 3500 * 1024).toLong)

Review comment:
       Nit: Here I would also construct the `sizes` from the concatenation of `smallBlockSizes` and `skewedBlockSizes`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955516431


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49253/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-946267118


   If we proceed with this approach then I think we should add new unit tests. We might be able to re-use test cases / logic from from https://github.com/apache/spark/pull/32733/files#diff-6f905716753c4647e146b60bb2e397cb19b7cda76f05208cbbdc891a5f16d54a 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968746058


   **[Test build #145229 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/145229/testReport)** for PR 34234 at commit [`2083cc4`](https://github.com/apache/spark/commit/2083cc4a130287f62bea4dcd0b565b019a08d5c8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968889573


   **[Test build #145229 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/145229/testReport)** for PR 34234 at commit [`2083cc4`](https://github.com/apache/spark/commit/2083cc4a130287f62bea4dcd0b565b019a08d5c8).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950801894


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144581/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-948549219


   Hi, @Ngone51 @JoeyValentine @mridulm 
   I add a parameter to limit the number of reported shuffle blocks if there are too many huge skewed blocks.
   I think this is also helpful to limit the memory usage for MapStatus object.
   Could you help me to review this PR again? 
   Many thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950752682


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49052/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950697039


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49052/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955856248


   hi, @attilapiros 
   Jenkins fails to build due to StackOverflowError, could you help me to retest again? 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956114957


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144805/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r733611640



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955516431


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49253/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955388957


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49253/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956114957


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144805/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950578600


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950529556


   **[Test build #144577 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144577/testReport)** for PR 34234 at commit [`4068f58`](https://github.com/apache/spark/commit/4068f580de9b78d7679920945168b99ea8f6ce09).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950631543


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144577/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950529556


   **[Test build #144577 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144577/testReport)** for PR 34234 at commit [`4068f58`](https://github.com/apache/spark/commit/4068f580de9b78d7679920945168b99ea8f6ce09).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r734948888



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }
+    val skewSizeThreshold =
+      medianSize * Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)

Review comment:
       If we are making a change to the current behavior, I would prefer to keep it disabled by default and enable it explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r749165424



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,28 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "Set to -1.0 to disable this feature by default.")
+      .version("3.3.0")
+      .doubleConf
+      .createWithDefault(-1.0)
+
+  private[spark] val SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER =
+    ConfigBuilder("spark.shuffle.maxAccurateSkewedBlockNumber")
+      .doc("Max skewed shuffle blocks allowed to be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than " +
+        "SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR multiplying the median shuffle block size or " +
+        "SHUFFLE_ACCURATE_BLOCK_THRESHOLD.")
+      .version("3.3.0")
+      .intConf
+      .checkValue(_ > 0, "Allowed max accurate skewed block number must be positive.")
+      .createWithDefault(100)
+

Review comment:
       Thanks @mridulm , I have made these configs `internal()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968906734


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/145229/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-969213986


   The changes look reasonable to me - but will want additional eyes on this.
   Any thoughts @Ngone51, @JoshRosen, @attilapiros ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968795492


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49699/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r783072141



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,70 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurate skewed block number if too many blocks are skewed") {
+    val skewedBlockNumber = 20

Review comment:
       nit: skewedBlockNumber => trackedSkewedBlocksLength

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,70 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurate skewed block number if too many blocks are skewed") {
+    val skewedBlockNumber = 20
+    val conf =
+      new SparkConf()
+        .set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+        .set(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER.key, skewedBlockNumber.toString)
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = 1
+    val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - skewedBlockNumber)
+    val skewBlocksSizes = sizes.slice(sizes.size - skewedBlockNumber, sizes.size)

Review comment:
       Here I used the old `skewedBlockNumber`, WDYT?
   ```suggestion
       val untrackedSkewedBlocksLength = 500
       val emptyBlocks = Array(0)
       val smallBlockSizes = Array.tabulate[Long](2500)(i => i + 1)
       val untrackedSkewedBlocksSizes = Array.tabulate[Long](untrackedSkewedBlocksLength)(i => i + 3500 * 1024)
       val trackedSkewedBlocksSizes = Array.tabulate[Long](skewedBlockNumber)(i => i + 4500 * 1024) 
       val nonEmptyBlocks = smallBlockSizes ++: untrackedSkewedBlocksSizes ++: trackedSkewedBlocksSizes
       val sizes = emptyBlocks ++: nonEmptyBlocks
       
       val avgNonEmpty =  nonEmptyBlocks.sum / nonEmptyBlocks.size
       assert(nonEmptyBlocks.filter(_ > avgNonEmpty).size == untrackedSkewedBlocksLength + skewedBlockNumber, "number of skewed block sizes")
       
       val smallAndUntrackedBlockSizes = nonEmptyBlocks.slice(0, nonEmptyBlocks.size - skewedBlockNumber)
       val avg = smallAndUntrackedBlockSizes.sum / smallAndUntrackedBlockSizes.length
       ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r812584351



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val shuffleAccurateBlockThreshold =
+      Option(SparkEnv.get)
+        .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+        .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)

Review comment:
       Yes, `uncompressedSizes` is sorted twice. Maybe we can change `Utils.median(sizes: Array[Long])` to `Utils.median(sizes: Array[Long], alreadySorted: Boolean = false)`,  and change `Utils.median(sortedSizes)` to `Utils.median(sortedSizes, true)` to avoid this extra sort ? 
   
   @ulysses-you WDYH
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955461575


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49253/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955330033


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144784/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956106022


   **[Test build #144805 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144805/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r731369282



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted
+      if (totalNumBlocks % 2 == 0) {
+        Math.max((sortedSizes(totalNumBlocks / 2) + sortedSizes(totalNumBlocks / 2 - 1)) / 2, 1)
+      } else {
+        Math.max(sortedSizes(totalNumBlocks / 2), 1)
+      }
+    }

Review comment:
       It looks like this code is copied form https://github.com/apache/spark/blob/1b2bb38ecda7c8826bdf7a47f4dde8a60a3378da/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L71-L79
   
   Instead of copy-paste duplicating it, I think we should extract the common code into a `median(long[])` helper method in `org.apache.spark.Utils`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735245433



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,37 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)
+        val maxAccurateSkewedBlockNumber =
+          Math.min(
+            Option(SparkEnv.get)
+              .map(_.conf.get(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER))
+              .getOrElse(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER.defaultValue.get),
+            totalNumBlocks
+          )
+        val skewSizeThreshold =
+          Math.max(
+            medianSize * accurateBlockSkewedFactor,
+            sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber)
+          )
+        Math.min(
+          Option(SparkEnv.get)
+            .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+            .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get),
+          skewSizeThreshold)
+      } else {
+        // Disable skew detection if accurateBlockSkewedFactor <= 0
+        Option(SparkEnv.get)
+          .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+          .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)

Review comment:
       nit: Avoid this duplication and pull this value out of the if/else ?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "-1 to disable this feature by default.")
+      .version("3.3.0")
+      .intConf

Review comment:
       Do we want to make this a `doubleConf` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950801894


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144581/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950752615


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49052/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735225991



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,24 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val medianSize: Long = {
+      val sortedSizes = uncompressedSizes.sorted

Review comment:
       Yes, I have updated PR, sort the shuffle blocks only if user enable this future manually. Is this OK?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735358419



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,37 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)
+        val maxAccurateSkewedBlockNumber =
+          Math.min(
+            Option(SparkEnv.get)
+              .map(_.conf.get(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER))
+              .getOrElse(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER.defaultValue.get),
+            totalNumBlocks
+          )
+        val skewSizeThreshold =
+          Math.max(
+            medianSize * accurateBlockSkewedFactor,
+            sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber)
+          )
+        Math.min(
+          Option(SparkEnv.get)
+            .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+            .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get),
+          skewSizeThreshold)
+      } else {
+        // Disable skew detection if accurateBlockSkewedFactor <= 0
+        Option(SparkEnv.get)
+          .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+          .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)

Review comment:
       Finished to pull the code out of if/else.
   
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950587581


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955330033


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144784/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r739662025



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350 * 1024).toLong)
+    val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until 3000) {
+      val estimate = status1.getSizeForBlock(i)
+      if (i < 2990) {
+        assert(estimate === avg)
+      } else {
+        assert(estimate === compressAndDecompressSize(sizes(i)))
+      }

Review comment:
       Updated

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350 * 1024).toLong)
+    val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until 3000) {
+      val estimate = status1.getSizeForBlock(i)
+      if (i < 2990) {
+        assert(estimate === avg)
+      } else {
+        assert(estimate === compressAndDecompressSize(sizes(i)))
+      }
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes = Array.tabulate[Long](3000)(i => (if (i < 2500) i else i + 3500 * 1024).toLong)

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r739661970



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "Set to -1.0 to disable this feature by default.")
+      .version("3.3.0")
+      .doubleConf
+      .createWithDefault(-1.0)
+
+  private[spark] val SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER =
+    ConfigBuilder("spark.shuffle.maxAccurateSkewedBlockNumber")
+      .doc("Max skewed shuffle blocks allowed to be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-950645992


   **[Test build #144581 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144581/testReport)** for PR 34234 at commit [`8fa9e86`](https://github.com/apache/spark/commit/8fa9e86308ec6e66929a30c191cacfcafe2073a6).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-955329931


   **[Test build #144784 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144784/testReport)** for PR 34234 at commit [`a1505ca`](https://github.com/apache/spark/commit/a1505cafaa964dbf3e808754f70a80877b0ba19a).
    * This patch **fails to build**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956051046


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49275/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-956071825


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49275/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r746026685



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,28 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It is " +
+        "recommended to set this parameter to be the same as SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "Set to -1.0 to disable this feature by default.")
+      .version("3.3.0")
+      .doubleConf
+      .createWithDefault(-1.0)
+
+  private[spark] val SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER =
+    ConfigBuilder("spark.shuffle.maxAccurateSkewedBlockNumber")
+      .doc("Max skewed shuffle blocks allowed to be accurately recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than " +
+        "SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR multiplying the median shuffle block size or " +
+        "SHUFFLE_ACCURATE_BLOCK_THRESHOLD.")
+      .version("3.3.0")
+      .intConf
+      .checkValue(_ > 0, "Allowed max accurate skewed block number must be positive.")
+      .createWithDefault(100)
+

Review comment:
       Do we want to make these config's `internal()` and expose them after we see the effects of the config ? (I think I probably mentioned this earlier).
   If we end up evolving/changing how this reporting works, the config would end up getting modified/removed.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968837172


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49699/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-968746058


   **[Test build #145229 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/145229/testReport)** for PR 34234 at commit [`2083cc4`](https://github.com/apache/spark/commit/2083cc4a130287f62bea4dcd0b565b019a08d5c8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde closed pull request #34234: [WIP][SPARK-36967][CORE] Report accurate block size threshold per reduce task

Posted by GitBox <gi...@apache.org>.
wankunde closed pull request #34234:
URL: https://github.com/apache/spark/pull/34234


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-945745992


   cc @cloud-fan @JoshRosen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r773385095



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = sizes.filter(_ == 0).length

Review comment:
       This computation makes the reader think is there/what is the reason behind. 
   I think its right side can be simply replaced by just a literal 1.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = sizes.filter(_ == 0).length
+    val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - 100)
+    val skewBlocksSizes = sizes.slice(sizes.size - 100, sizes.size)
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until sizes.length - 100) {

Review comment:
       instead of 100 use the new val and also below

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")

Review comment:
       I suggest to set `SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` to a an arbitrary number (a val) and use that val instead of the literal 100. This makes the test more readable.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)

Review comment:
       you can use the val introduced for setting `SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` + a literal to emphasize we have more skew blocks here then kept track. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r783784419



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,70 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurate skewed block number if too many blocks are skewed") {
+    val skewedBlockNumber = 20
+    val conf =
+      new SparkConf()
+        .set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+        .set(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER.key, skewedBlockNumber.toString)
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = 1
+    val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - skewedBlockNumber)
+    val skewBlocksSizes = sizes.slice(sizes.size - skewedBlockNumber, sizes.size)

Review comment:
       Thank you for your review.
   A small change to avoid misunderstanding.
   ```scala
       val skewThreshold = Utils.median(allBlocks.sorted) * accurateBlockSkewedFactor
       assert(nonEmptyBlocks.filter(_ > skewThreshold).size ==
         untrackedSkewedBlocksLength + trackedSkewedBlocksLength,
         "number of skewed block sizes")
   ```
   It is a skewed block only if it's size is greater than `the median block size * accurateBlockSkewedFactor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r782682467



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")

Review comment:
       Updated UT

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)

Review comment:
       Updated UT

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = sizes.filter(_ == 0).length
+    val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - 100)
+    val skewBlocksSizes = sizes.slice(sizes.size - 100, sizes.size)
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until sizes.length - 100) {

Review comment:
       Updated UT




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

Posted by GitBox <gi...@apache.org>.
wankunde commented on pull request #34234:
URL: https://github.com/apache/spark/pull/34234#issuecomment-1010595371


   @attilapiros Thanks for your review. I have updated UT, could you help review the code again. Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org