You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/01/15 02:37:30 UTC

spark git commit: [SPARK-12174] Speed up BlockManagerSuite getRemoteBytes() test

Repository: spark
Updated Branches:
  refs/heads/master bcc7373f6 -> 25782981c


[SPARK-12174] Speed up BlockManagerSuite getRemoteBytes() test

This patch significantly speeds up the BlockManagerSuite's "SPARK-9591: getRemoteBytes from another location when Exception throw" test, reducing the test time from 45s to ~250ms. The key change was to set `spark.shuffle.io.maxRetries` to 0 (the code previously set `spark.network.timeout` to `2s`, but this didn't make a difference because the slowdown was not due to this timeout).

Along the way, I also cleaned up the way that we handle SparkConf in BlockManagerSuite: previously, each test would mutate a shared SparkConf instance, while now each test gets a fresh SparkConf.

Author: Josh Rosen <jo...@databricks.com>

Closes #10759 from JoshRosen/SPARK-12174.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25782981
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25782981
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25782981

Branch: refs/heads/master
Commit: 25782981cf58946dc7c186acadd2beec5d964461
Parents: bcc7373
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Jan 14 17:37:27 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Jan 14 17:37:27 2016 -0800

----------------------------------------------------------------------
 .../spark/storage/BlockManagerSuite.scala       | 71 +++++++++-----------
 1 file changed, 30 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25782981/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 67210e5..62e6c4f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,20 +45,18 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
-  private val conf = new SparkConf(false).set("spark.app.id", "test")
+  var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
   var store3: BlockManager = null
   var rpcEnv: RpcEnv = null
   var master: BlockManagerMaster = null
-  conf.set("spark.authenticate", "false")
-  val securityMgr = new SecurityManager(conf)
-  val mapOutputTracker = new MapOutputTrackerMaster(conf)
-  val shuffleManager = new HashShuffleManager(conf)
+  val securityMgr = new SecurityManager(new SparkConf(false))
+  val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
+  val shuffleManager = new HashShuffleManager(new SparkConf(false))
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer", "1m")
-  val serializer = new KryoSerializer(conf)
+  val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
 
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -79,15 +77,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   override def beforeEach(): Unit = {
     super.beforeEach()
-    rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
-
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     System.setProperty("os.arch", "amd64")
-    conf.set("os.arch", "amd64")
-    conf.set("spark.test.useCompressedOops", "true")
+    conf = new SparkConf(false)
+      .set("spark.app.id", "test")
+      .set("spark.kryoserializer.buffer", "1m")
+      .set("spark.test.useCompressedOops", "true")
+      .set("spark.storage.unrollFraction", "0.4")
+      .set("spark.storage.unrollMemoryThreshold", "512")
+
+    rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
-    conf.set("spark.storage.unrollFraction", "0.4")
-    conf.set("spark.storage.unrollMemoryThreshold", "512")
 
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
@@ -98,6 +98,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   override def afterEach(): Unit = {
     try {
+      conf = null
       if (store != null) {
         store.stop()
         store = null
@@ -473,34 +474,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
-    val origTimeoutOpt = conf.getOption("spark.network.timeout")
-    try {
-      conf.set("spark.network.timeout", "2s")
-      store = makeBlockManager(8000, "executor1")
-      store2 = makeBlockManager(8000, "executor2")
-      store3 = makeBlockManager(8000, "executor3")
-      val list1 = List(new Array[Byte](4000))
-      store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-      store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-      var list1Get = store.getRemoteBytes("list1")
-      assert(list1Get.isDefined, "list1Get expected to be fetched")
-      // block manager exit
-      store2.stop()
-      store2 = null
-      list1Get = store.getRemoteBytes("list1")
-      // get `list1` block
-      assert(list1Get.isDefined, "list1Get expected to be fetched")
-      store3.stop()
-      store3 = null
-      // exception throw because there is no locations
-      intercept[BlockFetchException] {
-        list1Get = store.getRemoteBytes("list1")
-      }
-    } finally {
-      origTimeoutOpt match {
-        case Some(t) => conf.set("spark.network.timeout", t)
-        case None => conf.remove("spark.network.timeout")
-      }
+    conf.set("spark.shuffle.io.maxRetries", "0")
+    store = makeBlockManager(8000, "executor1")
+    store2 = makeBlockManager(8000, "executor2")
+    store3 = makeBlockManager(8000, "executor3")
+    val list1 = List(new Array[Byte](4000))
+    store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+    store2.stop()
+    store2 = null
+    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+    store3.stop()
+    store3 = null
+    // exception throw because there is no locations
+    intercept[BlockFetchException] {
+      store.getRemoteBytes("list1")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org