You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/26 02:18:01 UTC

[1/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager

Repository: spark
Updated Branches:
  refs/heads/master 712995757 -> 633d63a48


http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e1b2c96..e4ab9ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,6 +45,8 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
+  import BlockManagerSuite._
+
   var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER,
       master: BlockManagerMaster = this.master): BlockManager = {
+    val serializer = new KryoSerializer(conf)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
     val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
-    assert(store.getSingle("a1") === None, "a1 not removed from store")
-    assert(store.getSingle("a2") === None, "a2 not removed from store")
+    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
+    assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
     assert(master.getLocations("a2").size === 0, "master did not remove a2")
   }
@@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
     assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
     assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
-    assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
@@ -238,15 +241,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     master.removeBlock("a3-to-remove")
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a1-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a1-to-remove"))
       master.getLocations("a1-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a2-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a2-to-remove"))
       master.getLocations("a2-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a3-to-remove") should not be (None)
+      assert(store.hasLocalBlock("a3-to-remove"))
       master.getLocations("a3-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -262,30 +265,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 0)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
       master.getLocations(rdd(0, 0)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 1)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
       master.getLocations(rdd(0, 1)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("nonrddblock") should not be (None)
+      store.getSingleAndReleaseLock("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
-    store.getSingle(rdd(0, 0)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
-    store.getSingle(rdd(0, 1)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
     master.getLocations(rdd(0, 1)) should have size 0
   }
 
@@ -305,54 +308,54 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.getLocal(broadcast0BlockId) should not be (None)
-      s.getLocal(broadcast1BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId2) should not be (None)
+      assert(s.hasLocalBlock(broadcast0BlockId))
+      assert(s.hasLocalBlock(broadcast1BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId2))
     }
 
     // remove broadcast 0 block only from executors
     master.removeBroadcast(0, removeFromMaster = false, blocking = true)
 
     // only broadcast 0 block should be removed from the executor store
-    executorStore.getLocal(broadcast0BlockId) should be (None)
-    executorStore.getLocal(broadcast1BlockId) should not be (None)
-    executorStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(!executorStore.hasLocalBlock(broadcast0BlockId))
+    assert(executorStore.hasLocalBlock(broadcast1BlockId))
+    assert(executorStore.hasLocalBlock(broadcast2BlockId))
 
     // nothing should be removed from the driver store
-    driverStore.getLocal(broadcast0BlockId) should not be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
-    driverStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
+    assert(driverStore.hasLocalBlock(broadcast2BlockId))
 
     // remove broadcast 0 block from the driver as well
     master.removeBroadcast(0, removeFromMaster = true, blocking = true)
-    driverStore.getLocal(broadcast0BlockId) should be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
+    assert(!driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
 
     // remove broadcast 1 block from both the stores asynchronously
     // and verify all broadcast 1 blocks have been removed
     master.removeBroadcast(1, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast1BlockId) should be (None)
-      executorStore.getLocal(broadcast1BlockId) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast1BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast1BlockId))
     }
 
     // remove broadcast 2 from both the stores asynchronously
     // and verify all broadcast 2 blocks have been removed
     master.removeBroadcast(2, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast2BlockId) should be (None)
-      driverStore.getLocal(broadcast2BlockId2) should be (None)
-      executorStore.getLocal(broadcast2BlockId) should be (None)
-      executorStore.getLocal(broadcast2BlockId2) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId))
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
     }
     executorStore.stop()
     driverStore.stop()
@@ -363,9 +366,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
 
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
@@ -381,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -404,12 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+          store.putIteratorAndReleaseLock(
+            "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -425,8 +429,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -437,9 +441,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -479,8 +486,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store2.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store3.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store2.stop()
     store2 = null
@@ -506,18 +515,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingle("a1", a1, storageLevel)
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3") === None, "a3 was in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
   }
 
   test("in-memory LRU for partitions of same RDD") {
@@ -525,34 +534,34 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
     // from the same RDD
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
-    assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
     // Check that rdd_0_3 doesn't replace them even after further accesses
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
     assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
     // Do a get() on rdd_0_2 so that it is the most recently used item
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -567,28 +576,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
-    assert(store.getSingle("a2").isDefined, "a2 was in store")
-    assert(store.getSingle("a3").isDefined, "a3 was in store")
-    assert(store.getSingle("a1").isDefined, "a1 was in store")
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
   }
 
   test("disk and memory storage") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
   }
 
   test("disk and memory storage with serialization") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
   }
 
   def testDiskAndMemoryStorage(
@@ -598,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -615,19 +624,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
-    assert(store.getSingle("a1") == None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a4").isDefined, "a4 was not in store")
+    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store")
   }
 
   test("in-memory LRU with streams") {
@@ -635,23 +644,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list3") === None, "list1 was in store")
   }
 
   test("LRU with mixed storage levels and streams") {
@@ -661,33 +674,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
     // At this point LRU should not kick in because list3 is only on disk
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store")
     assert(store.get("list4").get.data.size === 2)
   }
 
@@ -705,18 +722,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
 
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
       store.stop()
@@ -724,7 +742,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
@@ -732,7 +751,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
@@ -740,28 +760,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
@@ -789,12 +810,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
     failAfter(1 second) {
-      assert(store.getSingle("a1").isEmpty, "a1 should not be in store")
+      assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
     }
   }
 
@@ -844,6 +865,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("updated block statuses") {
     store = makeBlockManager(12000)
+    store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))
 
@@ -860,7 +882,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
@@ -868,7 +891,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     }
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
@@ -876,7 +900,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
@@ -890,7 +915,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
@@ -905,7 +931,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
 
@@ -929,9 +956,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -945,9 +975,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,9 +1001,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
@@ -979,9 +1015,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
@@ -991,7 +1030,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
-    assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1086,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1098,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1130,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIterator("b1", smallIterator, memOnly)
-    store.putIterator("b2", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1142,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1169,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIterator("b1", smallIterator, memAndDisk)
-    store.putIterator("b2", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1183,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1244,6 +1284,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
+    store.blockInfoManager.lockNewBlockForWriting(
+      blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
     val result = memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
     })
@@ -1263,4 +1305,104 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(result.size === 10000)
     assert(result.data === Right(bytes))
   }
+
+  test("read-locked blocks cannot be evicted from the MemoryStore") {
+    store = makeBlockManager(12000)
+    val arr = new Array[Byte](4000)
+    // First store a1 and a2, both in memory, and a3, on disk only
+    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // This put should fail because both a1 and a2 should be read-locked:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a3").isEmpty, "a3 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // Release both pins of block a2:
+    store.releaseLock("a2")
+    store.releaseLock("a2")
+    // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
+    // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a2").isEmpty, "a2 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+  }
+}
+
+private object BlockManagerSuite {
+
+  private implicit class BlockManagerTestUtils(store: BlockManager) {
+
+    def putSingleAndReleaseLock(
+        block: BlockId,
+        value: Any,
+        storageLevel: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putSingle(block, value, storageLevel, tellMaster)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
+      if (store.putSingle(block, value, storageLevel)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel): Unit = {
+      if (store.putIterator(blockId, values, level)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putIterator(blockId, values, level, tellMaster)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def dropFromMemoryIfExists(
+        blockId: BlockId,
+        data: () => Either[Array[Any], ByteBuffer]): Unit = {
+      store.blockInfoManager.lockForWriting(blockId).foreach { info =>
+        val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
+        if (newEffectiveStorageLevel.isValid) {
+          // The block is still present in at least one store, so release the lock
+          // but don't delete the block info
+          store.releaseLock(blockId)
+        } else {
+          // The block isn't present in any store, so delete the block info so that the
+          // block can be stored again
+          store.blockInfoManager.removeBlock(blockId)
+        }
+      }
+    }
+
+    private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => {
+      val result = f(blockId)
+      if (result.isDefined) {
+        store.releaseLock(blockId)
+      }
+      result
+    }
+
+    def hasLocalBlock(blockId: BlockId): Boolean = {
+      getLocalAndReleaseLock(blockId).isDefined
+    }
+
+    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+    val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
+    val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
+    val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
index f55b884..631d767 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled;
 /**
  * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
  */
-public final class NioManagedBuffer extends ManagedBuffer {
+public class NioManagedBuffer extends ManagedBuffer {
   private final ByteBuffer buf;
 
   public NioManagedBuffer(ByteBuffer buf) {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 83d7953..efa2eea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -46,7 +46,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("withColumn doesn't invalidate cached dataframe") {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11863ca..86f02e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -40,7 +40,9 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("cache table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index e22e320..3d9c085 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,6 +91,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
+    } else {
+      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -189,6 +191,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
+      } else {
+        blockManager.releaseLock(blockId)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager

Posted by an...@apache.org.
[SPARK-12757] Add block-level read/write locks to BlockManager

## Motivation

As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.

## Changes

### BlockInfoManager and reader/writer locks

This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.

`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748).

See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics.

### Auto-release of locks at the end of tasks

Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.

To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks.

### Locking and the MemoryStore

In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.

### Locking and remote block transfer

This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.

## FAQ

- **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**

  Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue.

- **Why not detect "leaked" locks in tests?**:

  See above notes about `take()` and `limit`.

Author: Josh Rosen <jo...@databricks.com>

Closes #10705 from JoshRosen/pin-pages.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/633d63a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/633d63a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/633d63a4

Branch: refs/heads/master
Commit: 633d63a48ad98754dc7c56f9ac150fc2aa4e42c5
Parents: 7129957
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Feb 25 17:17:56 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Feb 25 17:17:56 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |   6 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  68 ++-
 .../org/apache/spark/executor/Executor.scala    |  18 +-
 .../apache/spark/network/BlockDataManager.scala |  10 +-
 .../network/netty/NettyBlockRpcServer.scala     |   6 +-
 .../scala/org/apache/spark/scheduler/Task.scala |   1 +
 .../org/apache/spark/storage/BlockInfo.scala    |  83 ---
 .../apache/spark/storage/BlockInfoManager.scala | 445 ++++++++++++++++
 .../org/apache/spark/storage/BlockManager.scala | 261 +++++-----
 .../storage/BlockManagerManagedBuffer.scala     |  48 ++
 .../org/apache/spark/storage/BlockStore.scala   |   2 +
 .../org/apache/spark/storage/MemoryStore.scala  |  47 +-
 .../org/apache/spark/CacheManagerSuite.scala    |   1 +
 .../org/apache/spark/ContextCleanerSuite.scala  |   3 +-
 .../KryoSerializerDistributedSuite.scala        |   6 +-
 .../spark/storage/BlockInfoManagerSuite.scala   | 287 +++++++++++
 .../storage/BlockManagerReplicationSuite.scala  |   6 +
 .../spark/storage/BlockManagerSuite.scala       | 510 ++++++++++++-------
 .../spark/network/buffer/NioManagedBuffer.java  |   2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   4 +-
 .../spark/sql/hive/CachedTableSuite.scala       |   4 +-
 .../receiver/ReceivedBlockHandler.scala         |   4 +
 22 files changed, 1384 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 1ec9ba7..2b456fa 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
+import org.apache.spark.util.CompletionIterator
 
 /**
  * Spark class responsible for passing RDDs partition contents to the BlockManager and making
@@ -47,6 +48,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         existingMetrics.incBytesReadInternal(blockResult.bytes)
 
         val iter = blockResult.data.asInstanceOf[Iterator[T]]
+
         new InterruptibleIterator[T](context, iter) {
           override def next(): T = {
             existingMetrics.incRecordsReadInternal(1)
@@ -156,7 +158,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         case Left(arr) =>
           // We have successfully unrolled the entire partition, so cache it in memory
           blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
-          arr.iterator.asInstanceOf[Iterator[T]]
+          CompletionIterator[T, Iterator[T]](
+            arr.iterator.asInstanceOf[Iterator[T]],
+            blockManager.releaseLock(key))
         case Right(it) =>
           // There is not enough space to cache this partition in memory
           val returnValues = it.asInstanceOf[Iterator[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 9bd6972..c08f87a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.Random
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
+import org.apache.spark._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
+import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ByteArrayChunkOutputStream
 
@@ -90,22 +90,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
 
   /**
    * Divide the object into multiple blocks and put those blocks in the block manager.
+   *
    * @param value the object to divide
    * @return number of blocks this broadcast variable is divided into
    */
   private def writeBlocks(value: T): Int = {
+    import StorageLevel._
     // Store a copy of the broadcast variable in the driver so that tasks run on the driver
     // do not create a duplicate copy of the broadcast variable's value.
-    SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
-      tellMaster = false)
+    val blockManager = SparkEnv.get.blockManager
+    if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
+      blockManager.releaseLock(broadcastId)
+    } else {
+      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+    }
     val blocks =
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
-      SparkEnv.get.blockManager.putBytes(
-        BroadcastBlockId(id, "piece" + i),
-        block,
-        StorageLevel.MEMORY_AND_DISK_SER,
-        tellMaster = true)
+      val pieceId = BroadcastBlockId(id, "piece" + i)
+      if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
+        blockManager.releaseLock(pieceId)
+      } else {
+        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
+      }
     }
     blocks.length
   }
@@ -127,16 +134,18 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
         // If we found the block from remote executors/driver's BlockManager, put the block
         // in this executor's BlockManager.
-        SparkEnv.get.blockManager.putBytes(
-          pieceId,
-          block,
-          StorageLevel.MEMORY_AND_DISK_SER,
-          tellMaster = true)
+        if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+          throw new SparkException(
+            s"Failed to store $pieceId of $broadcastId in local BlockManager")
+        }
         block
       }
       val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
         throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
+      // At this point we are guaranteed to hold a read lock, since we either got the block locally
+      // or stored the remotely-fetched block and automatically downgraded the write lock.
       blocks(pid) = block
+      releaseLock(pieceId)
     }
     blocks
   }
@@ -165,8 +174,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
-      SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
+      val blockManager = SparkEnv.get.blockManager
+      blockManager.getLocal(broadcastId).map(_.data.next()) match {
         case Some(x) =>
+          releaseLock(broadcastId)
           x.asInstanceOf[T]
 
         case None =>
@@ -179,13 +190,36 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
             blocks, SparkEnv.get.serializer, compressionCodec)
           // Store the merged copy in BlockManager so other tasks on this executor don't
           // need to re-fetch it.
-          SparkEnv.get.blockManager.putSingle(
-            broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+          val storageLevel = StorageLevel.MEMORY_AND_DISK
+          if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
+            releaseLock(broadcastId)
+          } else {
+            throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+          }
           obj
       }
     }
   }
 
+  /**
+   * If running in a task, register the given block's locks for release upon task completion.
+   * Otherwise, if not running in a task then immediately release the lock.
+   */
+  private def releaseLock(blockId: BlockId): Unit = {
+    val blockManager = SparkEnv.get.blockManager
+    Option(TaskContext.get()) match {
+      case Some(taskContext) =>
+        taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
+      case None =>
+        // This should only happen on the driver, where broadcast variables may be accessed
+        // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
+        // broadcast variables to be garbage collected we need to free the reference here
+        // which is slightly unsafe but is technically okay because broadcast variables aren't
+        // stored off-heap.
+        blockManager.releaseLock(blockId)
+    }
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 00be3a2..a602fca 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -218,7 +218,9 @@ private[spark] class Executor(
           threwException = false
           res
         } finally {
+          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
           val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+
           if (freedMemory > 0) {
             val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
             if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
@@ -227,6 +229,17 @@ private[spark] class Executor(
               logError(errMsg)
             }
           }
+
+          if (releasedLocks.nonEmpty) {
+            val errMsg =
+              s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
+                releasedLocks.mkString("[", ", ", "]")
+            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
+              throw new SparkException(errMsg)
+            } else {
+              logError(errMsg)
+            }
+          }
         }
         val taskFinish = System.currentTimeMillis()
 
@@ -266,8 +279,11 @@ private[spark] class Executor(
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
           } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
-            env.blockManager.putBytes(
+            val putSucceeded = env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+            if (putSucceeded) {
+              env.blockManager.releaseLock(blockId)
+            }
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 1745d52..cc5e851 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -31,6 +31,14 @@ trait BlockDataManager {
 
   /**
    * Put the block locally, using the given storage level.
+   *
+   * Returns true if the block was stored and false if the put operation failed or the block
+   * already existed.
    */
-  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
+  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
+
+  /**
+   * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
+   */
+  def releaseLock(blockId: BlockId): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index df8c21f..e4246df 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -65,7 +65,11 @@ class NettyBlockRpcServer(
         val level: StorageLevel =
           serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
-        blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
+        val blockId = BlockId(uploadBlock.blockId)
+        val putSucceeded = blockManager.putBlockData(blockId, data, level)
+        if (putSucceeded) {
+          blockManager.releaseLock(blockId)
+        }
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a49f371..5c68d00 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -64,6 +64,7 @@ private[spark] abstract class Task[T](
       taskAttemptId: Long,
       attemptNumber: Int,
       metricsSystem: MetricsSystem): T = {
+    SparkEnv.get.blockManager.registerTask(taskAttemptId)
     context = new TaskContextImpl(
       stageId,
       partitionId,

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
deleted file mode 100644
index 22fdf73..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.ConcurrentHashMap
-
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
-  // To save space, 'pending' and 'failed' are encoded as special sizes:
-  @volatile var size: Long = BlockInfo.BLOCK_PENDING
-  private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
-  private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
-  private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
-
-  setInitThread()
-
-  private def setInitThread() {
-    /* Set current thread as init thread - waitForReady will not block this thread
-     * (in case there is non trivial initialization which ends up calling waitForReady
-     * as part of initialization itself) */
-    BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
-  }
-
-  /**
-   * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
-   * Return true if the block is available, false otherwise.
-   */
-  def waitForReady(): Boolean = {
-    if (pending && initThread != Thread.currentThread()) {
-      synchronized {
-        while (pending) {
-          this.wait()
-        }
-      }
-    }
-    !failed
-  }
-
-  /** Mark this BlockInfo as ready (i.e. block is finished writing) */
-  def markReady(sizeInBytes: Long) {
-    require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
-    assert(pending)
-    size = sizeInBytes
-    BlockInfo.blockInfoInitThreads.remove(this)
-    synchronized {
-      this.notifyAll()
-    }
-  }
-
-  /** Mark this BlockInfo as ready but failed */
-  def markFailure() {
-    assert(pending)
-    size = BlockInfo.BLOCK_FAILED
-    BlockInfo.blockInfoInitThreads.remove(this)
-    synchronized {
-      this.notifyAll()
-    }
-  }
-}
-
-private object BlockInfo {
-  /* initThread is logically a BlockInfo field, but we store it here because
-   * it's only needed while this block is in the 'pending' state and we want
-   * to minimize BlockInfo's memory footprint. */
-  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
-
-  private val BLOCK_PENDING: Long = -1L
-  private val BLOCK_FAILED: Long = -2L
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
new file mode 100644
index 0000000..0eda97e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.common.collect.ConcurrentHashMultiset
+
+import org.apache.spark.{Logging, SparkException, TaskContext}
+
+
+/**
+ * Tracks metadata for an individual block.
+ *
+ * Instances of this class are _not_ thread-safe and are protected by locks in the
+ * [[BlockInfoManager]].
+ *
+ * @param level the block's storage level. This is the requested persistence level, not the
+ *              effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
+ *              does not imply that the block is actually resident in memory).
+ * @param tellMaster whether state changes for this block should be reported to the master. This
+ *                   is true for most blocks, but is false for broadcast blocks.
+ */
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+
+  /**
+   * The size of the block (in bytes)
+   */
+  def size: Long = _size
+  def size_=(s: Long): Unit = {
+    _size = s
+    checkInvariants()
+  }
+  private[this] var _size: Long = 0
+
+  /**
+   * The number of times that this block has been locked for reading.
+   */
+  def readerCount: Int = _readerCount
+  def readerCount_=(c: Int): Unit = {
+    _readerCount = c
+    checkInvariants()
+  }
+  private[this] var _readerCount: Int = 0
+
+  /**
+   * The task attempt id of the task which currently holds the write lock for this block, or
+   * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
+   * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
+   */
+  def writerTask: Long = _writerTask
+  def writerTask_=(t: Long): Unit = {
+    _writerTask = t
+    checkInvariants()
+  }
+  private[this] var _writerTask: Long = 0
+
+  /**
+   * True if this block has been removed from the BlockManager and false otherwise.
+   * This field is used to communicate block deletion to blocked readers / writers (see its usage
+   * in [[BlockInfoManager]]).
+   */
+  def removed: Boolean = _removed
+  def removed_=(r: Boolean): Unit = {
+    _removed = r
+    checkInvariants()
+  }
+  private[this] var _removed: Boolean = false
+
+  private def checkInvariants(): Unit = {
+    // A block's reader count must be non-negative:
+    assert(_readerCount >= 0)
+    // A block is either locked for reading or for writing, but not for both at the same time:
+    assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
+    // If a block is removed then it is not locked:
+    assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
+  }
+
+  checkInvariants()
+}
+
+private[storage] object BlockInfo {
+
+  /**
+   * Special task attempt id constant used to mark a block's write lock as being unlocked.
+   */
+  val NO_WRITER: Long = -1
+
+  /**
+   * Special task attempt id constant used to mark a block's write lock as being held by
+   * a non-task thread (e.g. by a driver thread or by unit test code).
+   */
+  val NON_TASK_WRITER: Long = -1024
+}
+
+/**
+ * Component of the [[BlockManager]] which tracks metadata for blocks and manages block locking.
+ *
+ * The locking interface exposed by this class is readers-writer lock. Every lock acquisition is
+ * automatically associated with a running task and locks are automatically released upon task
+ * completion or failure.
+ *
+ * This class is thread-safe.
+ */
+private[storage] class BlockInfoManager extends Logging {
+
+  private type TaskAttemptId = Long
+
+  /**
+   * Used to look up metadata for individual blocks. Entries are added to this map via an atomic
+   * set-if-not-exists operation ([[lockNewBlockForWriting()]]) and are removed
+   * by [[removeBlock()]].
+   */
+  @GuardedBy("this")
+  private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
+
+  /**
+   * Tracks the set of blocks that each task has locked for writing.
+   */
+  @GuardedBy("this")
+  private[this] val writeLocksByTask =
+    new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
+      with mutable.MultiMap[TaskAttemptId, BlockId]
+
+  /**
+   * Tracks the set of blocks that each task has locked for reading, along with the number of times
+   * that a block has been locked (since our read locks are re-entrant).
+   */
+  @GuardedBy("this")
+  private[this] val readLocksByTask =
+    new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
+
+  // ----------------------------------------------------------------------------------------------
+
+  // Initialization for special task attempt ids:
+  registerTask(BlockInfo.NON_TASK_WRITER)
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+   * Called at the start of a task in order to register that task with this [[BlockInfoManager]].
+   * This must be called prior to calling any other BlockInfoManager methods from that task.
+   */
+  def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
+    require(!readLocksByTask.contains(taskAttemptId),
+      s"Task attempt $taskAttemptId is already registered")
+    readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
+  }
+
+  /**
+   * Returns the current task's task attempt id (which uniquely identifies the task), or
+   * [[BlockInfo.NON_TASK_WRITER]] if called by a non-task thread.
+   */
+  private def currentTaskAttemptId: TaskAttemptId = {
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(BlockInfo.NON_TASK_WRITER)
+  }
+
+  /**
+   * Lock a block for reading and return its metadata.
+   *
+   * If another task has already locked this block for reading, then the read lock will be
+   * immediately granted to the calling task and its lock count will be incremented.
+   *
+   * If another task has locked this block for writing, then this call will block until the write
+   * lock is released or will return immediately if `blocking = false`.
+   *
+   * A single task can lock a block multiple times for reading, in which case each lock will need
+   * to be released separately.
+   *
+   * @param blockId the block to lock.
+   * @param blocking if true (default), this call will block until the lock is acquired. If false,
+   *                 this call will return immediately if the lock acquisition fails.
+   * @return None if the block did not exist or was removed (in which case no lock is held), or
+   *         Some(BlockInfo) (in which case the block is locked for reading).
+   */
+  def lockForReading(
+      blockId: BlockId,
+      blocking: Boolean = true): Option[BlockInfo] = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
+    infos.get(blockId).map { info =>
+      while (info.writerTask != BlockInfo.NO_WRITER) {
+        if (blocking) wait() else return None
+      }
+      if (info.removed) return None
+      info.readerCount += 1
+      readLocksByTask(currentTaskAttemptId).add(blockId)
+      logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+      info
+    }
+  }
+
+  /**
+   * Lock a block for writing and return its metadata.
+   *
+   * If another task has already locked this block for either reading or writing, then this call
+   * will block until the other locks are released or will return immediately if `blocking = false`.
+   *
+   * If this is called by a task which already holds the block's exclusive write lock, then this
+   * method will throw an exception.
+   *
+   * @param blockId the block to lock.
+   * @param blocking if true (default), this call will block until the lock is acquired. If false,
+   *                 this call will return immediately if the lock acquisition fails.
+   * @return None if the block did not exist or was removed (in which case no lock is held), or
+   *         Some(BlockInfo) (in which case the block is locked for writing).
+   */
+  def lockForWriting(
+      blockId: BlockId,
+      blocking: Boolean = true): Option[BlockInfo] = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
+    infos.get(blockId).map { info =>
+      if (info.writerTask == currentTaskAttemptId) {
+        throw new IllegalStateException(
+          s"Task $currentTaskAttemptId has already locked $blockId for writing")
+      } else {
+        while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
+          if (blocking) wait() else return None
+        }
+        if (info.removed) return None
+      }
+      info.writerTask = currentTaskAttemptId
+      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+      logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+      info
+    }
+  }
+
+  /**
+   * Throws an exception if the current task does not hold a write lock on the given block.
+   * Otherwise, returns the block's BlockInfo.
+   */
+  def assertBlockIsLockedForWriting(blockId: BlockId): BlockInfo = synchronized {
+    infos.get(blockId) match {
+      case Some(info) =>
+        if (info.writerTask != currentTaskAttemptId) {
+          throw new SparkException(
+            s"Task $currentTaskAttemptId has not locked block $blockId for writing")
+        } else {
+          info
+        }
+      case None =>
+        throw new SparkException(s"Block $blockId does not exist")
+    }
+  }
+
+  /**
+   * Get a block's metadata without acquiring any locks. This method is only exposed for use by
+   * [[BlockManager.getStatus()]] and should not be called by other code outside of this class.
+   */
+  private[storage] def get(blockId: BlockId): Option[BlockInfo] = synchronized {
+    infos.get(blockId)
+  }
+
+  /**
+   * Downgrades an exclusive write lock to a shared read lock.
+   */
+  def downgradeLock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
+    val info = get(blockId).get
+    require(info.writerTask == currentTaskAttemptId,
+      s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
+        s" block $blockId")
+    unlock(blockId)
+    val lockOutcome = lockForReading(blockId, blocking = false)
+    assert(lockOutcome.isDefined)
+  }
+
+  /**
+   * Release a lock on the given block.
+   */
+  def unlock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+    val info = get(blockId).getOrElse {
+      throw new IllegalStateException(s"Block $blockId not found")
+    }
+    if (info.writerTask != BlockInfo.NO_WRITER) {
+      info.writerTask = BlockInfo.NO_WRITER
+      writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+    } else {
+      assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
+      info.readerCount -= 1
+      val countsForTask = readLocksByTask(currentTaskAttemptId)
+      val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
+      assert(newPinCountForTask >= 0,
+        s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it")
+    }
+    notifyAll()
+  }
+
+  /**
+   * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
+   * exist.
+   *
+   * @param blockId the block id.
+   * @param newBlockInfo the block info for the new block.
+   * @return true if the block did not already exist, false otherwise. If this returns false, then
+   *         no new locks are acquired. If this returns true, a write lock on the new block will
+   *         be held.
+   */
+  def lockNewBlockForWriting(
+      blockId: BlockId,
+      newBlockInfo: BlockInfo): Boolean = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
+    if (!infos.contains(blockId)) {
+      infos(blockId) = newBlockInfo
+      newBlockInfo.writerTask = currentTaskAttemptId
+      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+      logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
+      true
+    } else {
+      logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
+        s"because that block already exists")
+      false
+    }
+  }
+
+  /**
+   * Release all lock held by the given task, clearing that task's pin bookkeeping
+   * structures and updating the global pin counts. This method should be called at the
+   * end of a task (either by a task completion handler or in `TaskRunner.run()`).
+   *
+   * @return the ids of blocks whose pins were released
+   */
+  def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
+    val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
+
+    val readLocks = synchronized {
+      readLocksByTask.remove(taskAttemptId).get
+    }
+    val writeLocks = synchronized {
+      writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
+    }
+
+    for (blockId <- writeLocks) {
+      infos.get(blockId).foreach { info =>
+        assert(info.writerTask == taskAttemptId)
+        info.writerTask = BlockInfo.NO_WRITER
+      }
+      blocksWithReleasedLocks += blockId
+    }
+    readLocks.entrySet().iterator().asScala.foreach { entry =>
+      val blockId = entry.getElement
+      val lockCount = entry.getCount
+      blocksWithReleasedLocks += blockId
+      synchronized {
+        get(blockId).foreach { info =>
+          info.readerCount -= lockCount
+          assert(info.readerCount >= 0)
+        }
+      }
+    }
+
+    synchronized {
+      notifyAll()
+    }
+    blocksWithReleasedLocks
+  }
+
+  /**
+   * Returns the number of blocks tracked.
+   */
+  def size: Int = synchronized {
+    infos.size
+  }
+
+  /**
+   * Return the number of map entries in this pin counter's internal data structures.
+   * This is used in unit tests in order to detect memory leaks.
+   */
+  private[storage] def getNumberOfMapEntries: Long = synchronized {
+    size +
+      readLocksByTask.size +
+      readLocksByTask.map(_._2.size()).sum +
+      writeLocksByTask.size +
+      writeLocksByTask.map(_._2.size).sum
+  }
+
+  /**
+   * Returns an iterator over a snapshot of all blocks' metadata. Note that the individual entries
+   * in this iterator are mutable and thus may reflect blocks that are deleted while the iterator
+   * is being traversed.
+   */
+  def entries: Iterator[(BlockId, BlockInfo)] = synchronized {
+    infos.toArray.toIterator
+  }
+
+  /**
+   * Removes the given block and releases the write lock on it.
+   *
+   * This can only be called while holding a write lock on the given block.
+   */
+  def removeBlock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
+    infos.get(blockId) match {
+      case Some(blockInfo) =>
+        if (blockInfo.writerTask != currentTaskAttemptId) {
+          throw new IllegalStateException(
+            s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
+        } else {
+          infos.remove(blockId)
+          blockInfo.readerCount = 0
+          blockInfo.writerTask = BlockInfo.NO_WRITER
+          blockInfo.removed = true
+        }
+      case None =>
+        throw new IllegalArgumentException(
+          s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
+    }
+    notifyAll()
+  }
+
+  /**
+   * Delete all state. Called during shutdown.
+   */
+  def clear(): Unit = synchronized {
+    infos.valuesIterator.foreach { blockInfo =>
+      blockInfo.readerCount = 0
+      blockInfo.writerTask = BlockInfo.NO_WRITER
+      blockInfo.removed = true
+    }
+    infos.clear()
+    readLocksByTask.clear()
+    writeLocksByTask.clear()
+    notifyAll()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 77fd03a..29124b3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
@@ -77,7 +75,7 @@ private[spark] class BlockManager(
 
   val diskBlockManager = new DiskBlockManager(this, conf)
 
-  private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
+  private[storage] val blockInfoManager = new BlockInfoManager
 
   private val futureExecutionContext = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
@@ -223,8 +221,8 @@ private[spark] class BlockManager(
    * will be made then.
    */
   private def reportAllBlocks(): Unit = {
-    logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
-    for ((blockId, info) <- blockInfo.asScala) {
+    logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
+    for ((blockId, info) <- blockInfoManager.entries) {
       val status = getCurrentBlockStatus(blockId, info)
       if (!tryToReportBlockStatus(blockId, info, status)) {
         logError(s"Failed to report $blockId to master; giving up.")
@@ -286,7 +284,7 @@ private[spark] class BlockManager(
         .asInstanceOf[Option[ByteBuffer]]
       if (blockBytesOpt.isDefined) {
         val buffer = blockBytesOpt.get
-        new NioManagedBuffer(buffer)
+        new BlockManagerManagedBuffer(this, blockId, buffer)
       } else {
         throw new BlockNotFoundException(blockId.toString)
       }
@@ -296,7 +294,7 @@ private[spark] class BlockManager(
   /**
    * Put the block locally, using the given storage level.
    */
-  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit = {
+  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
     putBytes(blockId, data.nioByteBuffer(), level)
   }
 
@@ -305,7 +303,7 @@ private[spark] class BlockManager(
    * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
    */
   def getStatus(blockId: BlockId): Option[BlockStatus] = {
-    blockInfo.asScala.get(blockId).map { info =>
+    blockInfoManager.get(blockId).map { info =>
       val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
       val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
       BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
@@ -318,7 +316,12 @@ private[spark] class BlockManager(
    * may not know of).
    */
   def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
-    (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
+    // The `toArray` is necessary here in order to force the list to be materialized so that we
+    // don't try to serialize a lazy iterator when responding to client requests.
+    (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
+      .filter(filter)
+      .toArray
+      .toSeq
   }
 
   /**
@@ -425,26 +428,11 @@ private[spark] class BlockManager(
   }
 
   private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
-    val info = blockInfo.get(blockId)
-    if (info != null) {
-      info.synchronized {
-        // Double check to make sure the block is still there. There is a small chance that the
-        // block has been removed by removeBlock (which also synchronizes on the blockInfo object).
-        // Note that this only checks metadata tracking. If user intentionally deleted the block
-        // on disk or from off heap storage without using removeBlock, this conditional check will
-        // still pass but eventually we will get an exception because we can't find the block.
-        if (blockInfo.asScala.get(blockId).isEmpty) {
-          logWarning(s"Block $blockId had been removed")
-          return None
-        }
-
-        // If another thread is writing the block, wait for it to become ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning(s"Block $blockId was marked as failure.")
-          return None
-        }
-
+    blockInfoManager.lockForReading(blockId) match {
+      case None =>
+        logDebug(s"Block $blockId was not found")
+        None
+      case Some(info) =>
         val level = info.level
         logDebug(s"Level for block $blockId is $level")
 
@@ -452,7 +440,10 @@ private[spark] class BlockManager(
         if (level.useMemory) {
           logDebug(s"Getting block $blockId from memory")
           val result = if (asBlockResult) {
-            memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
+            memoryStore.getValues(blockId).map { iter =>
+              val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+              new BlockResult(ci, DataReadMethod.Memory, info.size)
+            }
           } else {
             memoryStore.getBytes(blockId)
           }
@@ -470,6 +461,7 @@ private[spark] class BlockManager(
           val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
             case Some(b) => b
             case None =>
+              releaseLock(blockId)
               throw new BlockException(
                 blockId, s"Block $blockId not found on disk, though it should be")
           }
@@ -478,8 +470,9 @@ private[spark] class BlockManager(
           if (!level.useMemory) {
             // If the block shouldn't be stored in memory, we can just return it
             if (asBlockResult) {
-              return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
-                info.size))
+              val iter = CompletionIterator[Any, Iterator[Any]](
+                dataDeserialize(blockId, bytes), releaseLock(blockId))
+              return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
             } else {
               return Some(bytes)
             }
@@ -511,26 +504,34 @@ private[spark] class BlockManager(
                 // space to unroll the block. Either way, the put here should return an iterator.
                 putResult.data match {
                   case Left(it) =>
-                    return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+                    val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
+                    return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
                   case _ =>
                     // This only happens if we dropped the values back to disk (which is never)
                     throw new SparkException("Memory store did not return an iterator!")
                 }
               } else {
-                return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
+                val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
+                return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
               }
             }
           }
+        } else {
+          // This branch represents a case where the BlockInfoManager contained an entry for
+          // the block but the block could not be found in any of the block stores. This case
+          // should never occur, but for completeness's sake we address it here.
+          logError(
+            s"Block $blockId is supposedly stored locally but was not found in any block store")
+          releaseLock(blockId)
+          None
         }
-      }
-    } else {
-      logDebug(s"Block $blockId not registered locally")
     }
-    None
   }
 
   /**
    * Get block from remote block managers.
+   *
+   * This does not acquire a lock on this block in this JVM.
    */
   def getRemote(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting remote block $blockId")
@@ -597,6 +598,10 @@ private[spark] class BlockManager(
 
   /**
    * Get a block from the block manager (either local or remote).
+   *
+   * This acquires a read lock on the block if the block was stored locally and does not acquire
+   * any locks if the block was fetched from a remote block manager. The read lock will
+   * automatically be freed once the result's `data` iterator is fully consumed.
    */
   def get(blockId: BlockId): Option[BlockResult] = {
     val local = getLocal(blockId)
@@ -613,6 +618,36 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Downgrades an exclusive write lock to a shared read lock.
+   */
+  def downgradeLock(blockId: BlockId): Unit = {
+    blockInfoManager.downgradeLock(blockId)
+  }
+
+  /**
+   * Release a lock on the given block.
+   */
+  def releaseLock(blockId: BlockId): Unit = {
+    blockInfoManager.unlock(blockId)
+  }
+
+  /**
+   * Registers a task with the BlockManager in order to initialize per-task bookkeeping structures.
+   */
+  def registerTask(taskAttemptId: Long): Unit = {
+    blockInfoManager.registerTask(taskAttemptId)
+  }
+
+  /**
+   * Release all locks for the given task.
+   *
+   * @return the blocks whose locks were released.
+   */
+  def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = {
+    blockInfoManager.releaseAllLocksForTask(taskAttemptId)
+  }
+
+  /**
    * @return true if the block was stored or false if the block was already stored or an
    *         error occurred.
    */
@@ -703,19 +738,12 @@ private[spark] class BlockManager(
      * to be dropped right after it got put into memory. Note, however, that other threads will
      * not be able to get() this block until we call markReady on its BlockInfo. */
     val putBlockInfo = {
-      val tinfo = new BlockInfo(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning(s"Block $blockId already exists on this machine; not re-adding it")
-          return false
-        }
-        // TODO: So the block info exists - but previous attempt to load it (?) failed.
-        // What do we do now ? Retry on it ?
-        oldBlockOpt.get
+      val newInfo = new BlockInfo(level, tellMaster)
+      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+        newInfo
       } else {
-        tinfo
+        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
+        return false
       }
     }
 
@@ -750,7 +778,7 @@ private[spark] class BlockManager(
       case _ => null
     }
 
-    var marked = false
+    var blockWasSuccessfullyStored = false
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
@@ -792,11 +820,11 @@ private[spark] class BlockManager(
         }
 
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
-        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
+        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+        if (blockWasSuccessfullyStored) {
           // Now that the block is in either the memory, externalBlockStore, or disk store,
           // let other threads read it, and tell the master about it.
-          marked = true
-          putBlockInfo.markReady(size)
+          putBlockInfo.size = size
           if (tellMaster) {
             reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
           }
@@ -805,13 +833,10 @@ private[spark] class BlockManager(
           }
         }
       } finally {
-        // If we failed in putting the block to memory/disk, notify other possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (!marked) {
-          // Note that the remove must happen before markFailure otherwise another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          putBlockInfo.markFailure()
+        if (blockWasSuccessfullyStored) {
+          blockInfoManager.downgradeLock(blockId)
+        } else {
+          blockInfoManager.removeBlock(blockId)
           logWarning(s"Putting block $blockId failed")
         }
       }
@@ -852,7 +877,7 @@ private[spark] class BlockManager(
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
-    marked
+    blockWasSuccessfullyStored
   }
 
   /**
@@ -989,77 +1014,63 @@ private[spark] class BlockManager(
    * store reaches its limit and needs to free up space.
    *
    * If `data` is not put on disk, it won't be created.
+   *
+   * The caller of this method must hold a write lock on the block before calling this method.
+   * This method does not release the write lock.
+   *
+   * @return the block's new effective StorageLevel.
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: () => Either[Array[Any], ByteBuffer]): Unit = {
-
+      data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
     logInfo(s"Dropping block $blockId from memory")
-    val info = blockInfo.get(blockId)
-
-    // If the block has not already been dropped
-    if (info != null) {
-      info.synchronized {
-        // required ? As of now, this will be invoked only for blocks which are ready
-        // But in case this changes in future, adding for consistency sake.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning(s"Block $blockId was marked as failure. Nothing to drop")
-          return
-        } else if (blockInfo.asScala.get(blockId).isEmpty) {
-          logWarning(s"Block $blockId was already dropped.")
-          return
-        }
-        var blockIsUpdated = false
-        val level = info.level
-
-        // Drop to disk, if storage level requires
-        if (level.useDisk && !diskStore.contains(blockId)) {
-          logInfo(s"Writing block $blockId to disk")
-          data() match {
-            case Left(elements) =>
-              diskStore.putArray(blockId, elements, level, returnValues = false)
-            case Right(bytes) =>
-              diskStore.putBytes(blockId, bytes, level)
-          }
-          blockIsUpdated = true
-        }
+    val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+    var blockIsUpdated = false
+    val level = info.level
+
+    // Drop to disk, if storage level requires
+    if (level.useDisk && !diskStore.contains(blockId)) {
+      logInfo(s"Writing block $blockId to disk")
+      data() match {
+        case Left(elements) =>
+          diskStore.putArray(blockId, elements, level, returnValues = false)
+        case Right(bytes) =>
+          diskStore.putBytes(blockId, bytes, level)
+      }
+      blockIsUpdated = true
+    }
 
-        // Actually drop from memory store
-        val droppedMemorySize =
-          if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
-        val blockIsRemoved = memoryStore.remove(blockId)
-        if (blockIsRemoved) {
-          blockIsUpdated = true
-        } else {
-          logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
-        }
+    // Actually drop from memory store
+    val droppedMemorySize =
+      if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+    val blockIsRemoved = memoryStore.remove(blockId)
+    if (blockIsRemoved) {
+      blockIsUpdated = true
+    } else {
+      logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
+    }
 
-        val status = getCurrentBlockStatus(blockId, info)
-        if (info.tellMaster) {
-          reportBlockStatus(blockId, info, status, droppedMemorySize)
-        }
-        if (!level.useDisk) {
-          // The block is completely gone from this node; forget it so we can put() it again later.
-          blockInfo.remove(blockId)
-        }
-        if (blockIsUpdated) {
-          Option(TaskContext.get()).foreach { c =>
-            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
-          }
-        }
+    val status = getCurrentBlockStatus(blockId, info)
+    if (info.tellMaster) {
+      reportBlockStatus(blockId, info, status, droppedMemorySize)
+    }
+    if (blockIsUpdated) {
+      Option(TaskContext.get()).foreach { c =>
+        c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
       }
     }
+    status.storageLevel
   }
 
   /**
    * Remove all blocks belonging to the given RDD.
+   *
    * @return The number of blocks removed.
    */
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
     logInfo(s"Removing RDD $rddId")
-    val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+    val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
   }
@@ -1069,7 +1080,7 @@ private[spark] class BlockManager(
    */
   def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
     logDebug(s"Removing broadcast $broadcastId")
-    val blocksToRemove = blockInfo.asScala.keys.collect {
+    val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
       case bid @ BroadcastBlockId(`broadcastId`, _) => bid
     }
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
@@ -1081,9 +1092,11 @@ private[spark] class BlockManager(
    */
   def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
     logDebug(s"Removing block $blockId")
-    val info = blockInfo.get(blockId)
-    if (info != null) {
-      info.synchronized {
+    blockInfoManager.lockForWriting(blockId) match {
+      case None =>
+        // The block has already been removed; do nothing.
+        logWarning(s"Asked to remove block $blockId, which does not exist")
+      case Some(info) =>
         // Removals are idempotent in disk store and memory store. At worst, we get a warning.
         val removedFromMemory = memoryStore.remove(blockId)
         val removedFromDisk = diskStore.remove(blockId)
@@ -1091,15 +1104,11 @@ private[spark] class BlockManager(
           logWarning(s"Block $blockId could not be removed as it was not found in either " +
             "the disk, memory, or external block store")
         }
-        blockInfo.remove(blockId)
+        blockInfoManager.removeBlock(blockId)
         if (tellMaster && info.tellMaster) {
           val status = getCurrentBlockStatus(blockId, info)
           reportBlockStatus(blockId, info, status)
         }
-      }
-    } else {
-      // The block has already been removed; do nothing.
-      logWarning(s"Asked to remove block $blockId, which does not exist")
     }
   }
 
@@ -1174,7 +1183,7 @@ private[spark] class BlockManager(
     }
     diskBlockManager.stop()
     rpcEnv.stop(slaveEndpoint)
-    blockInfo.clear()
+    blockInfoManager.clear()
     memoryStore.clear()
     diskStore.clear()
     futureExecutionContext.shutdownNow()

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
new file mode 100644
index 0000000..5886b9c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+
+/**
+ * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
+ * so that the corresponding block's read lock can be released once this buffer's references
+ * are released.
+ *
+ * This is effectively a wrapper / bridge to connect the BlockManager's notion of read locks
+ * to the network layer's notion of retain / release counts.
+ */
+private[storage] class BlockManagerManagedBuffer(
+    blockManager: BlockManager,
+    blockId: BlockId,
+    buf: ByteBuffer) extends NioManagedBuffer(buf) {
+
+  override def retain(): ManagedBuffer = {
+    super.retain()
+    val locked = blockManager.blockInfoManager.lockForReading(blockId, blocking = false)
+    assert(locked.isDefined)
+    this
+  }
+
+  override def release(): ManagedBuffer = {
+    blockManager.releaseLock(blockId)
+    super.release()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 69985c9..6f6a677 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -60,8 +60,10 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
 
   /**
    * Remove a block, if it exists.
+   *
    * @param blockId the block to remove.
    * @return True if the block was found and removed, False otherwise.
+   * @throws IllegalStateException if the block is pinned by a task.
    */
   def remove(blockId: BlockId): Boolean
 

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 024b660..2f16c8f 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -95,7 +95,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       val values = blockManager.dataDeserialize(blockId, bytes)
       putIterator(blockId, values, level, returnValues = true)
     } else {
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
@@ -127,11 +127,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       returnValues: Boolean): PutResult = {
     if (level.deserialized) {
       val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, values, sizeEstimate, deserialized = true)
+      tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
       PutResult(sizeEstimate, Left(values.iterator))
     } else {
       val bytes = blockManager.dataSerialize(blockId, values.iterator)
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
@@ -208,7 +208,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
   }
 
   override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
-    val entry = entries.synchronized { entries.remove(blockId) }
+    val entry = entries.synchronized {
+      entries.remove(blockId)
+    }
     if (entry != null) {
       memoryManager.releaseStorageMemory(entry.size)
       logDebug(s"Block $blockId of size ${entry.size} dropped " +
@@ -327,14 +329,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     blockId.asRDDId.map(_.rddId)
   }
 
-  private def tryToPut(
-      blockId: BlockId,
-      value: Any,
-      size: Long,
-      deserialized: Boolean): Boolean = {
-    tryToPut(blockId, () => value, size, deserialized)
-  }
-
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
    * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
@@ -410,6 +404,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       var freedMemory = 0L
       val rddToAdd = blockId.flatMap(getRddId)
       val selectedBlocks = new ArrayBuffer[BlockId]
+      def blockIsEvictable(blockId: BlockId): Boolean = {
+        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+      }
       // This is synchronized to ensure that the set of entries is not changed
       // (because of getValue or getBytes) while traversing the iterator, as that
       // can lead to exceptions.
@@ -418,9 +415,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
         while (freedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
-          if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
-            selectedBlocks += blockId
-            freedMemory += pair.getValue.size
+          if (blockIsEvictable(blockId)) {
+            // We don't want to evict blocks which are currently being read, so we need to obtain
+            // an exclusive write lock on blocks which are candidates for eviction. We perform a
+            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
+            if (blockManager.blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
+              selectedBlocks += blockId
+              freedMemory += pair.getValue.size
+            }
           }
         }
       }
@@ -438,7 +440,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
-            blockManager.dropFromMemory(blockId, () => data)
+            val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
+            if (newEffectiveStorageLevel.isValid) {
+              // The block is still present in at least one store, so release the lock
+              // but don't delete the block info
+              blockManager.releaseLock(blockId)
+            } else {
+              // The block isn't present in any store, so delete the block info so that the
+              // block can be stored again
+              blockManager.blockInfoManager.removeBlock(blockId)
+            }
           }
         }
         freedMemory
@@ -447,6 +458,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
           logInfo(s"Will not store $id as it would require dropping another block " +
             "from the same RDD")
         }
+        selectedBlocks.foreach { id =>
+          blockManager.releaseLock(id)
+        }
         0L
       }
     }
@@ -463,6 +477,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
 
   /**
    * Reserve memory for unrolling the given block for this task.
+   *
    * @return whether the request is granted.
    */
   def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 48a0282..ffc02bc 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -87,6 +87,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
     val context = TaskContext.empty()
     try {
       TaskContext.setTaskContext(context)
+      sc.env.blockManager.registerTask(0)
       cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
       assert(context.taskMetrics.updatedBlockStatuses.size === 2)
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 7b02380..d1e806b 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -485,7 +485,8 @@ class CleanerTester(
   def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
     try {
       eventually(waitTimeout, interval(100 millis)) {
-        assert(isAllCleanedUp)
+        assert(isAllCleanedUp,
+          "The following resources were not cleaned up:\n" + uncleanedResourcesToString)
       }
       postCleanupValidate()
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index a0483f6..c1484b0 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
 import org.apache.spark.serializer.KryoDistributedTest._
 import org.apache.spark.util.Utils
 
-class KryoSerializerDistributedSuite extends SparkFunSuite {
+class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContext {
 
   test("kryo objects are serialised consistently in different processes") {
     val conf = new SparkConf(false)
@@ -34,7 +34,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
     val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
     conf.setJars(List(jar.getPath))
 
-    val sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val original = Thread.currentThread.getContextClassLoader
     val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
     SparkEnv.get.serializer.setDefaultClassLoader(loader)
@@ -47,8 +47,6 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
 
     // Join the two RDDs, and force evaluation
     assert(shuffledRDD.join(cachedRDD).collect().size == 1)
-
-    LocalSparkContext.stop(sc)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
new file mode 100644
index 0000000..662b18f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkException, SparkFunSuite, TaskContext, TaskContextImpl}
+
+
+class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  private implicit val ec = ExecutionContext.global
+  private var blockInfoManager: BlockInfoManager = _
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    blockInfoManager = new BlockInfoManager()
+    for (t <- 0 to 4) {
+      blockInfoManager.registerTask(t)
+    }
+  }
+
+  override protected def afterEach(): Unit = {
+    try {
+      blockInfoManager = null
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  private implicit def stringToBlockId(str: String): BlockId = {
+    TestBlockId(str)
+  }
+
+  private def newBlockInfo(): BlockInfo = {
+    new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+  }
+
+  private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
+    try {
+      TaskContext.setTaskContext(new TaskContextImpl(0, 0, taskAttemptId, 0, null, null))
+      block
+    } finally {
+      TaskContext.unset()
+    }
+  }
+
+  test("initial memory usage") {
+    assert(blockInfoManager.size === 0)
+  }
+
+  test("get non-existent block") {
+    assert(blockInfoManager.get("non-existent-block").isEmpty)
+    assert(blockInfoManager.lockForReading("non-existent-block").isEmpty)
+    assert(blockInfoManager.lockForWriting("non-existent-block").isEmpty)
+  }
+
+  test("basic lockNewBlockForWriting") {
+    val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+    val blockInfo = newBlockInfo()
+    withTaskId(1) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(blockInfo.readerCount === 0)
+      assert(blockInfo.writerTask === 1)
+      blockInfoManager.unlock("block")
+      assert(blockInfo.readerCount === 0)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+    }
+    assert(blockInfoManager.size === 1)
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
+  }
+
+  test("read locks are reentrant") {
+    withTaskId(1) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      assert(blockInfoManager.get("block").get.readerCount === 2)
+      assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.get("block").get.readerCount === 1)
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.get("block").get.readerCount === 0)
+    }
+  }
+
+  test("multiple tasks can hold read locks") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(2) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(3) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(4) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    assert(blockInfoManager.get("block").get.readerCount === 4)
+  }
+
+  test("single task can hold write lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForWriting("block").isDefined)
+      assert(blockInfoManager.get("block").get.writerTask === 1)
+    }
+    withTaskId(2) {
+      assert(blockInfoManager.lockForWriting("block", blocking = false).isEmpty)
+      assert(blockInfoManager.get("block").get.writerTask === 1)
+    }
+  }
+
+  test("cannot call lockForWriting while already holding a write lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForWriting("block").isDefined)
+      intercept[IllegalStateException] {
+        blockInfoManager.lockForWriting("block")
+      }
+      blockInfoManager.assertBlockIsLockedForWriting("block")
+    }
+  }
+
+  test("assertBlockIsLockedForWriting throws exception if block is not locked") {
+    intercept[SparkException] {
+      blockInfoManager.assertBlockIsLockedForWriting("block")
+    }
+    withTaskId(BlockInfo.NON_TASK_WRITER) {
+      intercept[SparkException] {
+        blockInfoManager.assertBlockIsLockedForWriting("block")
+      }
+    }
+  }
+
+  test("downgrade lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.downgradeLock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForReading("block").isDefined)
+    }
+    assert(blockInfoManager.get("block").get.readerCount === 2)
+    assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+  }
+
+  test("write lock will block readers") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val get1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    val get2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.unlock("block")
+    }
+    assert(Await.result(get1Future, 1.seconds).isDefined)
+    assert(Await.result(get2Future, 1.seconds).isDefined)
+    assert(blockInfoManager.get("block").get.readerCount === 2)
+  }
+
+  test("read locks will block writer") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      blockInfoManager.lockForReading("block")
+    }
+    val write1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    val write2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.unlock("block")
+    }
+    assert(
+      Await.result(Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined)
+    val firstWriteWinner = if (write1Future.isCompleted) 1 else 2
+    withTaskId(firstWriteWinner) {
+      blockInfoManager.unlock("block")
+    }
+    assert(Await.result(write1Future, 1.seconds).isDefined)
+    assert(Await.result(write2Future, 1.seconds).isDefined)
+  }
+
+  test("removing a non-existent block throws IllegalArgumentException") {
+    withTaskId(0) {
+      intercept[IllegalArgumentException] {
+        blockInfoManager.removeBlock("non-existent-block")
+      }
+    }
+  }
+
+  test("removing a block without holding any locks throws IllegalStateException") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      intercept[IllegalStateException] {
+        blockInfoManager.removeBlock("block")
+      }
+    }
+  }
+
+  test("removing a block while holding only a read lock throws IllegalStateException") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      intercept[IllegalStateException] {
+        blockInfoManager.removeBlock("block")
+      }
+    }
+  }
+
+  test("removing a block causes blocked callers to receive None") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val getFuture = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    val writeFuture = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.removeBlock("block")
+    }
+    assert(Await.result(getFuture, 1.seconds).isEmpty)
+    assert(Await.result(writeFuture, 1.seconds).isEmpty)
+  }
+
+  test("releaseAllLocksForTask releases write locks") {
+    val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 3)
+    blockInfoManager.releaseAllLocksForTask(0)
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 3fd6fb4..a94d8b4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,6 +190,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
 
     def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
       stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+      stores.head.releaseLock(blockId)
       val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
       stores.foreach { _.removeBlock(blockId) }
       master.removeBlock(blockId)
@@ -251,6 +252,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     // Insert a block with 2x replication and return the number of copies of the block
     def replicateAndGetNumCopies(blockId: String): Int = {
       store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+      store.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -288,6 +290,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
       val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
       initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      initialStores.head.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -355,6 +358,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
       stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      stores(0).releaseLock(blockId)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -367,6 +371,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
         assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        testStore.releaseLock(blockId)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
 
@@ -392,6 +397,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
           (1 to 10).foreach {
             i =>
               testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+              testStore.releaseLock(s"dummy-block-$i")
           }
           (1 to 10).foreach {
             i => testStore.removeBlock(s"dummy-block-$i")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org