You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/21 19:23:33 UTC

[GitHub] [spark] attilapiros commented on a change in pull request #34234: [SPARK-36967][CORE] Report accurate shuffle block size if its skewed

attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r773385095



##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = sizes.filter(_ == 0).length

Review comment:
       This computation makes the reader think is there/what is the reason behind. 
   I think its right side can be simply replaced by just a literal 1.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)
+    val emptyBlocksSize = sizes.filter(_ == 0).length
+    val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - 100)
+    val skewBlocksSizes = sizes.slice(sizes.size - 100, sizes.size)
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until sizes.length - 100) {

Review comment:
       instead of 100 use the new val and also below

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")

Review comment:
       I suggest to set `SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` to a an arbitrary number (a val) and use that val instead of the literal 100. This makes the test more readable.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  def compressAndDecompressSize(size: Long): Long = {
+    MapStatus.decompressSize(MapStatus.compressSize(size))
+  }
+
+  test("SPARK-36967: HighlyCompressedMapStatus should record accurately the size " +
+    "of skewed shuffle blocks") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+    val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+    val sizes = smallBlockSizes ++: skewBlocksSizes
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val mapTaskAttemptId = 5
+    val status = MapStatus(loc, sizes, mapTaskAttemptId)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    assert(status1.mapId == mapTaskAttemptId)
+    assert(status1.getSizeForBlock(0) == 0)
+    for (i <- 1 until smallBlockSizes.length) {
+      assert(status1.getSizeForBlock(i) === avg)
+    }
+    for (i <- 0 until skewBlocksSizes.length) {
+      assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+        compressAndDecompressSize(skewBlocksSizes(i)))
+    }
+  }
+
+  test("SPARK-36967: Limit accurated skewed block number if too many blocks are skewed") {
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+
+    val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+      Array.tabulate[Long](500)(i => i + 3500 * 1024)

Review comment:
       you can use the val introduced for setting `SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` + a literal to emphasize we have more skew blocks here then kept track. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org