You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/17 13:53:56 UTC

spark git commit: [SPARK-22540][SQL] Ensure HighlyCompressedMapStatus calculates correct avgSize

Repository: spark
Updated Branches:
  refs/heads/master b9dcbe5e1 -> d00b55d4b


[SPARK-22540][SQL] Ensure HighlyCompressedMapStatus calculates correct avgSize

## What changes were proposed in this pull request?

Ensure HighlyCompressedMapStatus calculates correct avgSize

## How was this patch tested?

New unit test added.

Author: yucai <yu...@intel.com>

Closes #19765 from yucai/avgsize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d00b55d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d00b55d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d00b55d4

Branch: refs/heads/master
Commit: d00b55d4b25ba0bf92983ff1bb47d8528e943737
Parents: b9dcbe5
Author: yucai <yu...@intel.com>
Authored: Fri Nov 17 07:53:53 2017 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Nov 17 07:53:53 2017 -0600

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  | 10 +++++----
 .../apache/spark/scheduler/MapStatusSuite.scala | 22 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d00b55d4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 5e45b37..2ec2f20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -197,7 +197,8 @@ private[spark] object HighlyCompressedMapStatus {
     // block as being non-empty (or vice-versa) when using the average block size.
     var i = 0
     var numNonEmptyBlocks: Int = 0
-    var totalSize: Long = 0
+    var numSmallBlocks: Int = 0
+    var totalSmallBlockSize: Long = 0
     // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
     // blocks. From a performance standpoint, we benefit from tracking empty blocks because
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
@@ -214,7 +215,8 @@ private[spark] object HighlyCompressedMapStatus {
         // Huge blocks are not included in the calculation for average size, thus size for smaller
         // blocks is more accurate.
         if (size < threshold) {
-          totalSize += size
+          totalSmallBlockSize += size
+          numSmallBlocks += 1
         } else {
           hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
         }
@@ -223,8 +225,8 @@ private[spark] object HighlyCompressedMapStatus {
       }
       i += 1
     }
-    val avgSize = if (numNonEmptyBlocks > 0) {
-      totalSize / numNonEmptyBlocks
+    val avgSize = if (numSmallBlocks > 0) {
+      totalSmallBlockSize / numSmallBlocks
     } else {
       0
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d00b55d4/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 144e5af..2155a0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -98,6 +98,28 @@ class MapStatusSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize") {
+    val threshold = 1000
+    val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, threshold.toString)
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+    val sizes = (0L to 3000L).toArray
+    val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold)
+    val avg = smallBlockSizes.sum / smallBlockSizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val status = MapStatus(loc, sizes)
+    val status1 = compressAndDecompressMapStatus(status)
+    assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status1.location == loc)
+    for (i <- 0 until threshold) {
+      val estimate = status1.getSizeForBlock(i)
+      if (sizes(i) > 0) {
+        assert(estimate === avg)
+      }
+    }
+  }
+
   def compressAndDecompressMapStatus(status: MapStatus): MapStatus = {
     val ser = new JavaSerializer(new SparkConf)
     val buf = ser.newInstance().serialize(status)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org