You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/02 19:26:50 UTC

[1/2] spark git commit: [SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager

Repository: spark
Updated Branches:
  refs/heads/master 8f8d8a231 -> d6969ffc0


http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e4ab9ee..89b4270 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
@@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
     store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
@@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
@@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
 
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIteratorAndReleaseLock(
+          store.putIterator(
             "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
@@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIteratorAndReleaseLock(
+    store2.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store3.putIteratorAndReleaseLock(
+    store3.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store2.stop()
@@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
     // from the same RDD
     assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
     assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
@@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
@@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
-    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
@@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
@@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
@@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
@@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
@@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
@@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
@@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     }
     assert(updatedBlocks2.size === 1)
@@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
@@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
@@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
@@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
@@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
+    store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val arr = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
     // This put should fail because both a1 and a2 should be read-locked:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a3").isEmpty, "a3 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
@@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.releaseLock("a2")
     // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
     // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a2").isEmpty, "a2 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a3").isDefined, "a3 was not in store")
@@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
 
   private implicit class BlockManagerTestUtils(store: BlockManager) {
 
-    def putSingleAndReleaseLock(
-        block: BlockId,
-        value: Any,
-        storageLevel: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putSingle(block, value, storageLevel, tellMaster)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
-      if (store.putSingle(block, value, storageLevel)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel): Unit = {
-      if (store.putIterator(blockId, values, level)) {
-        store.releaseLock(blockId)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putIterator(blockId, values, level, tellMaster)) {
-        store.releaseLock(blockId)
-      }
-    }
-
     def dropFromMemoryIfExists(
         blockId: BlockId,
         data: () => Either[Array[Any], ByteBuffer]): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 3d9c085..e22e320 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
-    } else {
-      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
-      } else {
-        blockManager.releaseLock(blockId)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager

Posted by an...@apache.org.
[SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager

CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes #10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <jo...@databricks.com>

Closes #11436 from JoshRosen/remove-cachemanager.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6969ffc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6969ffc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6969ffc

Branch: refs/heads/master
Commit: d6969ffc0f86c8a4ea0e94d06cb227178b000962
Parents: 8f8d8a2
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Mar 2 10:26:47 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Mar 2 10:26:47 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   | 179 --------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |   4 -
 .../spark/broadcast/TorrentBroadcast.scala      |  44 ++--
 .../org/apache/spark/executor/Executor.scala    |   5 +-
 .../network/netty/NettyBlockRpcServer.scala     |   5 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  33 ++-
 .../apache/spark/storage/BlockInfoManager.scala | 107 ++++-----
 .../org/apache/spark/storage/BlockManager.scala | 127 ++++++----
 .../org/apache/spark/storage/BlockStore.scala   |   8 -
 .../org/apache/spark/storage/DiskStore.scala    |   8 -
 .../org/apache/spark/storage/MemoryStore.scala  |  28 +--
 .../org/apache/spark/CacheManagerSuite.scala    |  97 --------
 .../spark/storage/BlockInfoManagerSuite.scala   |  73 +++++-
 .../storage/BlockManagerReplicationSuite.scala  |   5 -
 .../spark/storage/BlockManagerSuite.scala       | 235 ++++++++-----------
 .../receiver/ReceivedBlockHandler.scala         |   4 -
 16 files changed, 365 insertions(+), 597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
deleted file mode 100644
index 2b456fa..0000000
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import scala.collection.mutable
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-import org.apache.spark.util.CompletionIterator
-
-/**
- * Spark class responsible for passing RDDs partition contents to the BlockManager and making
- * sure a node doesn't load two copies of an RDD at once.
- */
-private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-
-  /** Keys of RDD partitions that are being computed/loaded. */
-  private val loading = new mutable.HashSet[RDDBlockId]
-
-  /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
-  def getOrCompute[T](
-      rdd: RDD[T],
-      partition: Partition,
-      context: TaskContext,
-      storageLevel: StorageLevel): Iterator[T] = {
-
-    val key = RDDBlockId(rdd.id, partition.index)
-    logDebug(s"Looking for partition $key")
-    blockManager.get(key) match {
-      case Some(blockResult) =>
-        // Partition is already materialized, so just return its values
-        val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
-        existingMetrics.incBytesReadInternal(blockResult.bytes)
-
-        val iter = blockResult.data.asInstanceOf[Iterator[T]]
-
-        new InterruptibleIterator[T](context, iter) {
-          override def next(): T = {
-            existingMetrics.incRecordsReadInternal(1)
-            delegate.next()
-          }
-        }
-      case None =>
-        // Acquire a lock for loading this partition
-        // If another thread already holds the lock, wait for it to finish return its results
-        val storedValues = acquireLockForPartition[T](key)
-        if (storedValues.isDefined) {
-          return new InterruptibleIterator[T](context, storedValues.get)
-        }
-
-        // Otherwise, we have to load the partition ourselves
-        try {
-          logInfo(s"Partition $key not found, computing it")
-          val computedValues = rdd.computeOrReadCheckpoint(partition, context)
-          val cachedValues = putInBlockManager(key, computedValues, storageLevel)
-          new InterruptibleIterator(context, cachedValues)
-        } finally {
-          loading.synchronized {
-            loading.remove(key)
-            loading.notifyAll()
-          }
-        }
-    }
-  }
-
-  /**
-   * Acquire a loading lock for the partition identified by the given block ID.
-   *
-   * If the lock is free, just acquire it and return None. Otherwise, another thread is already
-   * loading the partition, so we wait for it to finish and return the values loaded by the thread.
-   */
-  private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
-    loading.synchronized {
-      if (!loading.contains(id)) {
-        // If the partition is free, acquire its lock to compute its value
-        loading.add(id)
-        None
-      } else {
-        // Otherwise, wait for another thread to finish and return its result
-        logInfo(s"Another thread is loading $id, waiting for it to finish...")
-        while (loading.contains(id)) {
-          try {
-            loading.wait()
-          } catch {
-            case e: Exception =>
-              logWarning(s"Exception while waiting for another thread to load $id", e)
-          }
-        }
-        logInfo(s"Finished waiting for $id")
-        val values = blockManager.get(id)
-        if (!values.isDefined) {
-          /* The block is not guaranteed to exist even after the other thread has finished.
-           * For instance, the block could be evicted after it was put, but before our get.
-           * In this case, we still need to load the partition ourselves. */
-          logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
-          loading.add(id)
-        }
-        values.map(_.data.asInstanceOf[Iterator[T]])
-      }
-    }
-  }
-
-  /**
-   * Cache the values of a partition, keeping track of any updates in the storage statuses of
-   * other blocks along the way.
-   *
-   * The effective storage level refers to the level that actually specifies BlockManager put
-   * behavior, not the level originally specified by the user. This is mainly for forcing a
-   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
-   * while preserving the original semantics of the RDD as specified by the application.
-   */
-  private def putInBlockManager[T](
-      key: BlockId,
-      values: Iterator[T],
-      level: StorageLevel,
-      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
-
-    val putLevel = effectiveStorageLevel.getOrElse(level)
-    if (!putLevel.useMemory) {
-      /*
-       * This RDD is not to be cached in memory, so we can just pass the computed values as an
-       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
-       */
-      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
-      blockManager.get(key) match {
-        case Some(v) => v.data.asInstanceOf[Iterator[T]]
-        case None =>
-          logInfo(s"Failure to store $key")
-          throw new BlockException(key, s"Block manager failed to return cached value for $key!")
-      }
-    } else {
-      /*
-       * This RDD is to be cached in memory. In this case we cannot pass the computed values
-       * to the BlockManager as an iterator and expect to read it back later. This is because
-       * we may end up dropping a partition from memory store before getting it back.
-       *
-       * In addition, we must be careful to not unroll the entire partition in memory at once.
-       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
-       * single partition. Instead, we unroll the values cautiously, potentially aborting and
-       * dropping the partition to disk if applicable.
-       */
-      blockManager.memoryStore.unrollSafely(key, values) match {
-        case Left(arr) =>
-          // We have successfully unrolled the entire partition, so cache it in memory
-          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
-          CompletionIterator[T, Iterator[T]](
-            arr.iterator.asInstanceOf[Iterator[T]],
-            blockManager.releaseLock(key))
-        case Right(it) =>
-          // There is not enough space to cache this partition in memory
-          val returnValues = it.asInstanceOf[Iterator[T]]
-          if (putLevel.useDisk) {
-            logWarning(s"Persisting partition $key to disk instead.")
-            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
-              useOffHeap = false, deserialized = false, putLevel.replication)
-            putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
-          } else {
-            returnValues
-          }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 204f735..b3b3729 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -56,7 +56,6 @@ class SparkEnv (
     private[spark] val rpcEnv: RpcEnv,
     val serializer: Serializer,
     val closureSerializer: Serializer,
-    val cacheManager: CacheManager,
     val mapOutputTracker: MapOutputTracker,
     val shuffleManager: ShuffleManager,
     val broadcastManager: BroadcastManager,
@@ -333,8 +332,6 @@ object SparkEnv extends Logging {
 
     val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
 
-    val cacheManager = new CacheManager(blockManager)
-
     val metricsSystem = if (isDriver) {
       // Don't start metrics system right now for Driver.
       // We need to wait for the task scheduler to give us an app ID.
@@ -371,7 +368,6 @@ object SparkEnv extends Logging {
       rpcEnv,
       serializer,
       closureSerializer,
-      cacheManager,
       mapOutputTracker,
       shuffleManager,
       broadcastManager,

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index c08f87a..dabc810 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -99,18 +99,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     // Store a copy of the broadcast variable in the driver so that tasks run on the driver
     // do not create a duplicate copy of the broadcast variable's value.
     val blockManager = SparkEnv.get.blockManager
-    if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
-      blockManager.releaseLock(broadcastId)
-    } else {
+    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
       throw new SparkException(s"Failed to store $broadcastId in BlockManager")
     }
     val blocks =
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
       val pieceId = BroadcastBlockId(id, "piece" + i)
-      if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
-        blockManager.releaseLock(pieceId)
-      } else {
+      if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
         throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
       }
     }
@@ -130,22 +126,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       // First try getLocalBytes because there is a chance that previous attempts to fetch the
       // broadcast blocks have already fetched some of the blocks. In that case, some blocks
       // would be available locally (on this executor).
-      def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
-      def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
-        // If we found the block from remote executors/driver's BlockManager, put the block
-        // in this executor's BlockManager.
-        if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
-          throw new SparkException(
-            s"Failed to store $pieceId of $broadcastId in local BlockManager")
-        }
-        block
+      bm.getLocalBytes(pieceId) match {
+        case Some(block) =>
+          blocks(pid) = block
+          releaseLock(pieceId)
+        case None =>
+          bm.getRemoteBytes(pieceId) match {
+            case Some(b) =>
+              // We found the block from remote executors/driver's BlockManager, so put the block
+              // in this executor's BlockManager.
+              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+                throw new SparkException(
+                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
+              }
+              blocks(pid) = b
+            case None =>
+              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
+          }
       }
-      val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
-        throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
-      // At this point we are guaranteed to hold a read lock, since we either got the block locally
-      // or stored the remotely-fetched block and automatically downgraded the write lock.
-      blocks(pid) = block
-      releaseLock(pieceId)
     }
     blocks
   }
@@ -191,9 +189,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
           // Store the merged copy in BlockManager so other tasks on this executor don't
           // need to re-fetch it.
           val storageLevel = StorageLevel.MEMORY_AND_DISK
-          if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
-            releaseLock(broadcastId)
-          } else {
+          if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
             throw new SparkException(s"Failed to store $broadcastId in BlockManager")
           }
           obj

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a959f20..e88d6cd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -292,11 +292,8 @@ private[spark] class Executor(
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
           } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
-            val putSucceeded = env.blockManager.putBytes(
+            env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
-            if (putSucceeded) {
-              env.blockManager.releaseLock(blockId)
-            }
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index e4246df..e86933b 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -66,10 +66,7 @@ class NettyBlockRpcServer(
           serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
         val blockId = BlockId(uploadBlock.blockId)
-        val putSucceeded = blockManager.putBlockData(blockId, data, level)
-        if (putSucceeded) {
-          blockManager.releaseLock(blockId)
-        }
+        blockManager.putBlockData(blockId, data, level)
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6a6ad2d..e5fdebc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
@@ -272,7 +272,7 @@ abstract class RDD[T: ClassTag](
    */
   final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
     if (storageLevel != StorageLevel.NONE) {
-      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
+      getOrCompute(split, context)
     } else {
       computeOrReadCheckpoint(split, context)
     }
@@ -315,6 +315,35 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
+   */
+  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
+    val blockId = RDDBlockId(id, partition.index)
+    var readCachedBlock = true
+    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
+    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+      readCachedBlock = false
+      computeOrReadCheckpoint(partition, context)
+    }) match {
+      case Left(blockResult) =>
+        if (readCachedBlock) {
+          val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
+          existingMetrics.incBytesReadInternal(blockResult.bytes)
+          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
+            override def next(): T = {
+              existingMetrics.incRecordsReadInternal(1)
+              delegate.next()
+            }
+          }
+        } else {
+          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
+        }
+      case Right(iter) =>
+        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
+    }
+  }
+
+  /**
    * Execute a block of code in a scope such that all new RDDs created in this body will
    * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 0eda97e..b23244a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -71,27 +71,13 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
     _writerTask = t
     checkInvariants()
   }
-  private[this] var _writerTask: Long = 0
-
-  /**
-   * True if this block has been removed from the BlockManager and false otherwise.
-   * This field is used to communicate block deletion to blocked readers / writers (see its usage
-   * in [[BlockInfoManager]]).
-   */
-  def removed: Boolean = _removed
-  def removed_=(r: Boolean): Unit = {
-    _removed = r
-    checkInvariants()
-  }
-  private[this] var _removed: Boolean = false
+  private[this] var _writerTask: Long = BlockInfo.NO_WRITER
 
   private def checkInvariants(): Unit = {
     // A block's reader count must be non-negative:
     assert(_readerCount >= 0)
     // A block is either locked for reading or for writing, but not for both at the same time:
     assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
-    // If a block is removed then it is not locked:
-    assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
   }
 
   checkInvariants()
@@ -195,16 +181,22 @@ private[storage] class BlockInfoManager extends Logging {
       blockId: BlockId,
       blocking: Boolean = true): Option[BlockInfo] = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
-    infos.get(blockId).map { info =>
-      while (info.writerTask != BlockInfo.NO_WRITER) {
-        if (blocking) wait() else return None
+    do {
+      infos.get(blockId) match {
+        case None => return None
+        case Some(info) =>
+          if (info.writerTask == BlockInfo.NO_WRITER) {
+            info.readerCount += 1
+            readLocksByTask(currentTaskAttemptId).add(blockId)
+            logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+            return Some(info)
+          }
       }
-      if (info.removed) return None
-      info.readerCount += 1
-      readLocksByTask(currentTaskAttemptId).add(blockId)
-      logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
-      info
-    }
+      if (blocking) {
+        wait()
+      }
+    } while (blocking)
+    None
   }
 
   /**
@@ -226,21 +218,25 @@ private[storage] class BlockInfoManager extends Logging {
       blockId: BlockId,
       blocking: Boolean = true): Option[BlockInfo] = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
-    infos.get(blockId).map { info =>
-      if (info.writerTask == currentTaskAttemptId) {
-        throw new IllegalStateException(
-          s"Task $currentTaskAttemptId has already locked $blockId for writing")
-      } else {
-        while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
-          if (blocking) wait() else return None
-        }
-        if (info.removed) return None
+    do {
+      infos.get(blockId) match {
+        case None => return None
+        case Some(info) =>
+          if (info.writerTask == currentTaskAttemptId) {
+            throw new IllegalStateException(
+              s"Task $currentTaskAttemptId has already locked $blockId for writing")
+          } else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
+            info.writerTask = currentTaskAttemptId
+            writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+            logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+            return Some(info)
+          }
       }
-      info.writerTask = currentTaskAttemptId
-      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
-      logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
-      info
-    }
+      if (blocking) {
+        wait()
+      }
+    } while (blocking)
+    None
   }
 
   /**
@@ -306,29 +302,30 @@ private[storage] class BlockInfoManager extends Logging {
   }
 
   /**
-   * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
-   * exist.
+   * Attempt to acquire the appropriate lock for writing a new block.
+   *
+   * This enforces the first-writer-wins semantics. If we are the first to write the block,
+   * then just go ahead and acquire the write lock. Otherwise, if another thread is already
+   * writing the block, then we wait for the write to finish before acquiring the read lock.
    *
-   * @param blockId the block id.
-   * @param newBlockInfo the block info for the new block.
    * @return true if the block did not already exist, false otherwise. If this returns false, then
-   *         no new locks are acquired. If this returns true, a write lock on the new block will
-   *         be held.
+   *         a read lock on the existing block will be held. If this returns true, a write lock on
+   *         the new block will be held.
    */
   def lockNewBlockForWriting(
       blockId: BlockId,
       newBlockInfo: BlockInfo): Boolean = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
-    if (!infos.contains(blockId)) {
-      infos(blockId) = newBlockInfo
-      newBlockInfo.writerTask = currentTaskAttemptId
-      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
-      logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
-      true
-    } else {
-      logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
-        s"because that block already exists")
-      false
+    lockForReading(blockId) match {
+      case Some(info) =>
+        // Block already exists. This could happen if another thread races with us to compute
+        // the same block. In this case, just keep the read lock and return.
+        false
+      case None =>
+        // Block does not yet exist or is removed, so we are free to acquire the write lock
+        infos(blockId) = newBlockInfo
+        lockForWriting(blockId)
+        true
     }
   }
 
@@ -418,7 +415,6 @@ private[storage] class BlockInfoManager extends Logging {
           infos.remove(blockId)
           blockInfo.readerCount = 0
           blockInfo.writerTask = BlockInfo.NO_WRITER
-          blockInfo.removed = true
         }
       case None =>
         throw new IllegalArgumentException(
@@ -434,7 +430,6 @@ private[storage] class BlockInfoManager extends Logging {
     infos.valuesIterator.foreach { blockInfo =>
       blockInfo.readerCount = 0
       blockInfo.writerTask = BlockInfo.NO_WRITER
-      blockInfo.removed = true
     }
     infos.clear()
     readLocksByTask.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 29124b3..b59191b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -44,8 +44,7 @@ import org.apache.spark.util._
 
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
+private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -648,8 +647,38 @@ private[spark] class BlockManager(
   }
 
   /**
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
+   * to compute the block, persist it, and return its values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an iterator if the block
+   *         could not be cached.
+   */
+  def getOrElseUpdate(
+      blockId: BlockId,
+      level: StorageLevel,
+      makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
+    // Initially we hold no locks on this block.
+    doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
+      case None =>
+        // doPut() didn't hand work back to us, so the block already existed or was successfully
+        // stored. Therefore, we now hold a read lock on the block.
+        val blockResult = get(blockId).getOrElse {
+          // Since we held a read lock between the doPut() and get() calls, the block should not
+          // have been evicted, so get() not returning the block indicates some internal error.
+          releaseLock(blockId)
+          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
+        }
+        Left(blockResult)
+      case Some(failedPutResult) =>
+        // The put failed, likely because the data was too large to fit in memory and could not be
+        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
+        // that they can decide what to do with the values (e.g. process them without caching).
+       Right(failedPutResult.data.left.get)
+    }
+  }
+
+  /**
+   * @return true if the block was stored or false if an error occurred.
    */
   def putIterator(
       blockId: BlockId,
@@ -658,7 +687,7 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
+    doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
   }
 
   /**
@@ -679,26 +708,9 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Put a new block of values to the block manager.
-   *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
-   */
-  def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
-    require(values != null, "Values is null")
-    doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
-  }
-
-  /**
    * Put a new block of serialized bytes to the block manager.
    *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * @return true if the block was stored or false if an error occurred.
    */
   def putBytes(
       blockId: BlockId,
@@ -707,26 +719,32 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
   }
 
   /**
    * Put the given block according to the given level in one of the block stores, replicating
    * the values if necessary.
    *
-   * The effective storage level refers to the level according to which the block will actually be
-   * handled. This allows the caller to specify an alternate behavior of doPut while preserving
-   * the original level specified by the user.
+   * If the block already exists, this method will not overwrite it.
    *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * @param effectiveStorageLevel the level according to which the block will actually be handled.
+   *                              This allows the caller to specify an alternate behavior of doPut
+   *                              while preserving the original level specified by the user.
+   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+   *                     block already exists). If false, this method will hold no locks when it
+   *                     returns.
+   * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
+   *         or None if the block already existed or was successfully stored (fully consuming
+   *         the input data / input iterator).
    */
   private def doPut(
       blockId: BlockId,
       data: BlockValues,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      effectiveStorageLevel: Option[StorageLevel] = None,
+      keepReadLock: Boolean = false): Option[PutResult] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -743,7 +761,11 @@ private[spark] class BlockManager(
         newInfo
       } else {
         logWarning(s"Block $blockId already exists on this machine; not re-adding it")
-        return false
+        if (!keepReadLock) {
+          // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
+          releaseLock(blockId)
+        }
+        return None
       }
     }
 
@@ -779,6 +801,7 @@ private[spark] class BlockManager(
     }
 
     var blockWasSuccessfullyStored = false
+    var result: PutResult = null
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
@@ -803,11 +826,9 @@ private[spark] class BlockManager(
         }
 
         // Actually put the values
-        val result = data match {
+        result = data match {
           case IteratorValues(iterator) =>
-            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
-          case ArrayValues(array) =>
-            blockStore.putArray(blockId, array, putLevel, returnValues)
+            blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
           case ByteBufferValues(bytes) =>
             bytes.rewind()
             blockStore.putBytes(blockId, bytes, putLevel)
@@ -834,7 +855,11 @@ private[spark] class BlockManager(
         }
       } finally {
         if (blockWasSuccessfullyStored) {
-          blockInfoManager.downgradeLock(blockId)
+          if (keepReadLock) {
+            blockInfoManager.downgradeLock(blockId)
+          } else {
+            blockInfoManager.unlock(blockId)
+          }
         } else {
           blockInfoManager.removeBlock(blockId)
           logWarning(s"Putting block $blockId failed")
@@ -852,18 +877,20 @@ private[spark] class BlockManager(
             Await.ready(replicationFuture, Duration.Inf)
           }
         case _ =>
-          val remoteStartTime = System.currentTimeMillis
-          // Serialize the block if not already done
-          if (bytesAfterPut == null) {
-            if (valuesAfterPut == null) {
-              throw new SparkException(
-                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+          if (blockWasSuccessfullyStored) {
+            val remoteStartTime = System.currentTimeMillis
+            // Serialize the block if not already done
+            if (bytesAfterPut == null) {
+              if (valuesAfterPut == null) {
+                throw new SparkException(
+                  "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+              }
+              bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
             }
-            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
+            replicate(blockId, bytesAfterPut, putLevel)
+            logDebug("Put block %s remotely took %s"
+              .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
           }
-          replicate(blockId, bytesAfterPut, putLevel)
-          logDebug("Put block %s remotely took %s"
-            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
       }
     }
 
@@ -877,7 +904,11 @@ private[spark] class BlockManager(
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
-    blockWasSuccessfullyStored
+    if (blockWasSuccessfullyStored) {
+      None
+    } else {
+      Some(result)
+    }
   }
 
   /**
@@ -1033,7 +1064,7 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putArray(blockId, elements, level, returnValues = false)
+          diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
         case Right(bytes) =>
           diskStore.putBytes(blockId, bytes, level)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 6f6a677..d3af50d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -19,8 +19,6 @@ package org.apache.spark.storage
 
 import java.nio.ByteBuffer
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.Logging
 
 /**
@@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
     level: StorageLevel,
     returnValues: Boolean): PutResult
 
-  def putArray(
-    blockId: BlockId,
-    values: Array[Any],
-    level: StorageLevel,
-    returnValues: Boolean): PutResult
-
   /**
    * Return the size of a block in bytes.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 1f3f193..bfa6560 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -58,14 +58,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIterator(blockId, values.toIterator, level, returnValues)
-  }
-
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 2f16c8f..317d73a 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -120,22 +120,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     PutResult(size, data)
   }
 
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    if (level.deserialized) {
-      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
-      PutResult(sizeEstimate, Left(values.iterator))
-    } else {
-      val bytes = blockManager.dataSerialize(blockId, values.iterator)
-      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
-    }
-  }
-
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
@@ -166,7 +150,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
-        val res = putArray(blockId, arrayValues, level, returnValues)
+        val res = {
+          if (level.deserialized) {
+            val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+            tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
+            PutResult(sizeEstimate, Left(arrayValues.iterator))
+          } else {
+            val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+            tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
+            PutResult(bytes.limit(), Right(bytes.duplicate()))
+          }
+        }
         PutResult(res.size, res.data)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
deleted file mode 100644
index ffc02bc..0000000
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-
-// TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
-  with MockitoSugar {
-
-  var blockManager: BlockManager = _
-  var cacheManager: CacheManager = _
-  var split: Partition = _
-  /** An RDD which returns the values [1, 2, 3, 4]. */
-  var rdd: RDD[Int] = _
-  var rdd2: RDD[Int] = _
-  var rdd3: RDD[Int] = _
-
-  before {
-    sc = new SparkContext("local", "test")
-    blockManager = mock[BlockManager]
-    cacheManager = new CacheManager(blockManager)
-    split = new Partition { override def index: Int = 0 }
-    rdd = new RDD[Int](sc, Nil) {
-      override def getPartitions: Array[Partition] = Array(split)
-      override val getDependencies = List[Dependency[_]]()
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        Array(1, 2, 3, 4).iterator
-    }
-    rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
-      override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        firstParent[Int].iterator(split, context)
-    }.cache()
-    rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
-      override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        firstParent[Int].iterator(split, context)
-    }.cache()
-  }
-
-  test("get uncached rdd") {
-    // Do not mock this test, because attempting to match Array[Any], which is not covariant,
-    // in blockManager.put is a losing battle. You have been warned.
-    blockManager = sc.env.blockManager
-    cacheManager = sc.env.cacheManager
-    val context = TaskContext.empty()
-    val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-    val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
-    assert(computeValue.toList === List(1, 2, 3, 4))
-    assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
-    assert(getValue.get.data.toList === List(1, 2, 3, 4))
-  }
-
-  test("get cached rdd") {
-    val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
-    when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
-
-    val context = TaskContext.empty()
-    val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-    assert(value.toList === List(5, 6, 7))
-  }
-
-  test("verify task metrics updated correctly") {
-    cacheManager = sc.env.cacheManager
-    val context = TaskContext.empty()
-    try {
-      TaskContext.setTaskContext(context)
-      sc.env.blockManager.registerTask(0)
-      cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
-      assert(context.taskMetrics.updatedBlockStatuses.size === 2)
-    } finally {
-      TaskContext.unset()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 662b18f..fe83fc7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     withTaskId(1) {
       assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
       assert(blockInfoManager.get("block").get eq blockInfo)
-      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
-      assert(blockInfoManager.get("block").get eq blockInfo)
       assert(blockInfo.readerCount === 0)
       assert(blockInfo.writerTask === 1)
+      // Downgrade lock so that second call doesn't block:
+      blockInfoManager.downgradeLock("block")
+      assert(blockInfo.readerCount === 1)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      assert(blockInfo.readerCount === 2)
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(blockInfo.readerCount === 2)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+      blockInfoManager.unlock("block")
       blockInfoManager.unlock("block")
       assert(blockInfo.readerCount === 0)
       assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
@@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
   }
 
+  test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val lock1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    val lock2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.downgradeLock("block")
+    }
+    // After downgrading to a read lock, both threads should wake up and acquire the shared
+    // read lock.
+    assert(!Await.result(lock1Future, 1.seconds))
+    assert(!Await.result(lock2Future, 1.seconds))
+    assert(blockInfoManager.get("block").get.readerCount === 3)
+  }
+
+  test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val lock1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    val lock2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.removeBlock("block")
+    }
+    // After removing the block, the write lock is released. Both threads should wake up but only
+    // one should acquire the write lock. The second thread should block until the winner of the
+    // write race releases its lock.
+    val winningFuture: Future[Boolean] =
+      Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
+    assert(winningFuture.value.get.get)
+    val winningTID = blockInfoManager.get("block").get.writerTask
+    assert(winningTID === 1 || winningTID === 2)
+    val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future
+    assert(!losingFuture.isCompleted)
+    // Once the writer releases its lock, the blocked future should wake up again and complete.
+    withTaskId(winningTID) {
+      blockInfoManager.unlock("block")
+    }
+    assert(!Await.result(losingFuture, 1.seconds))
+    assert(blockInfoManager.get("block").get.readerCount === 1)
+  }
+
   test("read locks are reentrant") {
     withTaskId(1) {
       assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a94d8b4..ae1faf5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
 
     def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
       stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
-      stores.head.releaseLock(blockId)
       val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
       stores.foreach { _.removeBlock(blockId) }
       master.removeBlock(blockId)
@@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     // Insert a block with 2x replication and return the number of copies of the block
     def replicateAndGetNumCopies(blockId: String): Int = {
       store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
-      store.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
       val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
       initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
-      initialStores.head.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
       stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
-      stores(0).releaseLock(blockId)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
           (1 to 10).foreach {
             i =>
               testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
-              testStore.releaseLock(s"dummy-block-$i")
           }
           (1 to 10).foreach {
             i => testStore.removeBlock(s"dummy-block-$i")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org