You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/26 17:15:43 UTC

spark git commit: [SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS only once

Repository: spark
Updated Branches:
  refs/heads/master bd2ae857d -> e702fb1d5


[SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS only once

## What changes were proposed in this pull request?
Previously SPARK-24519 created a modifiable config SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS. However, the config is being parsed for every creation of MapStatus, which could be very expensive. Another problem with the previous approach is that it created the illusion that this can be changed dynamically at runtime, which was not true. This PR changes it so the config is computed only once.

## How was this patch tested?
Removed a test case that's no longer valid.

Closes #22521 from rxin/SPARK-24519.

Authored-by: Reynold Xin <rx...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e702fb1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e702fb1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e702fb1d

Branch: refs/heads/master
Commit: e702fb1d5218d062fcb8e618b92dad7958eb4062
Parents: bd2ae85
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Sep 26 10:15:16 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Wed Sep 26 10:15:16 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  | 12 ++++++---
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 --------------------
 2 files changed, 9 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e702fb1d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 659694d..0e221ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus {
 
 private[spark] object MapStatus {
 
+  /**
+   * Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly here because in test
+   * code we can't assume SparkEnv.get exists.
+   */
+  private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
+    .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
+    .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
+
   def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
-    if (uncompressedSizes.length >  Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
-      .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
+    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
       HighlyCompressedMapStatus(loc, uncompressedSizes)
     } else {
       new CompressedMapStatus(loc, uncompressedSizes)

http://git-wip-us.apache.org/repos/asf/spark/blob/e702fb1d/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 354e638..2155a0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -188,32 +188,4 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
-
-  test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
-    val conf = new SparkConf()
-    val env = mock(classOf[SparkEnv])
-    doReturn(conf).when(env).conf
-    SparkEnv.set(env)
-    val sizes = Array.fill[Long](500)(150L)
-    // Test default value
-    val status = MapStatus(null, sizes)
-    assert(status.isInstanceOf[CompressedMapStatus])
-    // Test Non-positive values
-    for (s <- -1 to 0) {
-      assertThrows[IllegalArgumentException] {
-        conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-        val status = MapStatus(null, sizes)
-      }
-    }
-    // Test positive values
-    Seq(1, 100, 499, 500, 501).foreach { s =>
-      conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-      val status = MapStatus(null, sizes)
-      if(sizes.length > s) {
-        assert(status.isInstanceOf[HighlyCompressedMapStatus])
-      } else {
-        assert(status.isInstanceOf[CompressedMapStatus])
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org