You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/17 13:53:56 UTC
spark git commit: [SPARK-22540][SQL] Ensure HighlyCompressedMapStatus
calculates correct avgSize
Repository: spark
Updated Branches:
refs/heads/master b9dcbe5e1 -> d00b55d4b
[SPARK-22540][SQL] Ensure HighlyCompressedMapStatus calculates correct avgSize
## What changes were proposed in this pull request?
Ensure HighlyCompressedMapStatus calculates correct avgSize
## How was this patch tested?
New unit test added.
Author: yucai <yu...@intel.com>
Closes #19765 from yucai/avgsize.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d00b55d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d00b55d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d00b55d4
Branch: refs/heads/master
Commit: d00b55d4b25ba0bf92983ff1bb47d8528e943737
Parents: b9dcbe5
Author: yucai <yu...@intel.com>
Authored: Fri Nov 17 07:53:53 2017 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Nov 17 07:53:53 2017 -0600
----------------------------------------------------------------------
.../org/apache/spark/scheduler/MapStatus.scala | 10 +++++----
.../apache/spark/scheduler/MapStatusSuite.scala | 22 ++++++++++++++++++++
2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d00b55d4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 5e45b37..2ec2f20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -197,7 +197,8 @@ private[spark] object HighlyCompressedMapStatus {
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
var numNonEmptyBlocks: Int = 0
- var totalSize: Long = 0
+ var numSmallBlocks: Int = 0
+ var totalSmallBlockSize: Long = 0
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
@@ -214,7 +215,8 @@ private[spark] object HighlyCompressedMapStatus {
// Huge blocks are not included in the calculation for average size, thus size for smaller
// blocks is more accurate.
if (size < threshold) {
- totalSize += size
+ totalSmallBlockSize += size
+ numSmallBlocks += 1
} else {
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
}
@@ -223,8 +225,8 @@ private[spark] object HighlyCompressedMapStatus {
}
i += 1
}
- val avgSize = if (numNonEmptyBlocks > 0) {
- totalSize / numNonEmptyBlocks
+ val avgSize = if (numSmallBlocks > 0) {
+ totalSmallBlockSize / numSmallBlocks
} else {
0
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d00b55d4/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 144e5af..2155a0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -98,6 +98,28 @@ class MapStatusSuite extends SparkFunSuite {
}
}
+ test("SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize") {
+ val threshold = 1000
+ val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, threshold.toString)
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+ val sizes = (0L to 3000L).toArray
+ val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold)
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val status = MapStatus(loc, sizes)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ for (i <- 0 until threshold) {
+ val estimate = status1.getSizeForBlock(i)
+ if (sizes(i) > 0) {
+ assert(estimate === avg)
+ }
+ }
+ }
+
def compressAndDecompressMapStatus(status: MapStatus): MapStatus = {
val ser = new JavaSerializer(new SparkConf)
val buf = ser.newInstance().serialize(status)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org