You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/28 01:08:26 UTC

[1/2] [SPARK-1777] Prevent OOMs from single partitions

Repository: spark
Updated Branches:
  refs/heads/master f6ff2a61d -> ecf30ee7e


http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 23cb690..dd4fd53 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -43,6 +43,7 @@ import scala.language.postfixOps
 
 class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   with PrivateMethodTester {
+
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
+  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+    new BlockManager(
+      name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
+  }
+
   before {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf,
-      securityManager = securityMgr)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
     this.actorSystem = actorSystem
-    conf.set("spark.driver.port", boundPort.toString)
-
-    master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
-      conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
+    conf.set("spark.driver.port", boundPort.toString)
+    conf.set("spark.storage.unrollFraction", "0.4")
+    conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf)
+
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
   }
@@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("master + 1 manager interaction") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("master + 2 managers interaction") {
-    store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000, "exec1")
+    store2 = makeBlockManager(2000, "exec2")
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("removing block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
@@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
-    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
-    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+    assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
+    assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
     assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
     assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
     assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
@@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
-      memStatus._1 should equal (2000L)
-      memStatus._2 should equal (2000L)
+      memStatus._1 should equal (20000L)
+      memStatus._2 should equal (20000L)
     }
   }
 
   test("removing rdd") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
     store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
@@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("removing broadcast") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val driverStore = store
-    val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    val executorStore = makeBlockManager(2000, "executor")
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
   test("reregistration doesn't dead lock") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
 
@@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
@@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("correct BlockResult returned from get() calls") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
-      mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list1ForSizeEstimate = new ArrayBuffer[Any]
-    list1ForSizeEstimate ++= list1.iterator
-    val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
-    val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
-    val list2ForSizeEstimate = new ArrayBuffer[Any]
-    list2ForSizeEstimate ++= list2.iterator
-    val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
+    val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
+    val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
@@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
@@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
@@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
     val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
     if (tachyonUnitTestEnabled) {
-      store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(1200)
       val a1 = new Array[Byte](400)
       val a2 = new Array[Byte](400)
       val a3 = new Array[Byte](400)
@@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("on-disk storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
-    val a4 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
+    val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
@@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3").isDefined, "list3 was not in store")
@@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
     assert(store.get("list2").isDefined, "list2 was not in store")
@@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    val list4 = List(new Array[Byte](200), new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
@@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.get("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+    store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("overly large block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(5000)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
   }
@@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
-      store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(20000, "exec1")
       store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
       store = null
 
       conf.set("spark.shuffle.compress", "false")
-      store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+      store = makeBlockManager(20000, "exec2")
+      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "true")
-      store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+      store = makeBlockManager(20000, "exec3")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "false")
-      store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
+      store = makeBlockManager(20000, "exec4")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
-      store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
+      store = makeBlockManager(20000, "exec5")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
-      store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
+      store = makeBlockManager(20000, "exec6")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
-      assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+      store = makeBlockManager(20000, "exec7")
+      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
     } finally {
@@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(Arrays.equals(mappedAsArray, bytes))
     assert(Arrays.equals(notMappedAsArray, bytes))
   }
-  
+
   test("updated block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
-    val bigList = List.fill(8)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
+    val bigList = List.fill(8)(new Array[Byte](2000))
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 =
-      store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
     assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 =
-      store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
     assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 =
-      store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
       id match {
@@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
         case _ => fail("Updated block is neither list1 nor list3")
       }
     }
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 =
-      store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
       id match {
@@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
         case _ => fail("Updated block is neither list2 nor list4")
       }
     }
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
 
-    // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+    // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 =
-      store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks5.size === 0)
-    assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list4").isDefined, "list4 was not in store")
-    assert(!store.get("list5").isDefined, "list5 was in store")
+
+    // memory store contains only list3 and list4
+    assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
+    assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
+    assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
+
+    // disk store contains only list2
+    assert(!store.diskStore.contains("list1"), "list1 was in disk store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(!store.diskStore.contains("list3"), "list3 was in disk store")
+    assert(!store.diskStore.contains("list4"), "list4 was in disk store")
+    assert(!store.diskStore.contains("list5"), "list5 was in disk store")
   }
 
   test("query block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("get matching blocks") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](10))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
     assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
 
     // insert some more blocks
-    store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
@@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
     assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
   }
+
+  test("reserve/release unroll memory") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    assert(memoryStore.currentUnrollMemory === 0)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Reserve
+    memoryStore.reserveUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 100)
+    memoryStore.reserveUnrollMemoryForThisThread(200)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 300)
+    memoryStore.reserveUnrollMemoryForThisThread(500)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800)
+    memoryStore.reserveUnrollMemoryForThisThread(1000000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+    // Release
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 700)
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+    // Reserve again
+    memoryStore.reserveUnrollMemoryForThisThread(4400)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
+    memoryStore.reserveUnrollMemoryForThisThread(20000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
+    // Release again
+    memoryStore.releaseUnrollMemoryForThisThread(1000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
+    memoryStore.releaseUnrollMemoryForThisThread() // release all
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+  }
+
+  /**
+   * Verify the result of MemoryStore#unrollSafely is as expected.
+   */
+  private def verifyUnroll(
+      expected: Iterator[Any],
+      result: Either[Array[Any], Iterator[Any]],
+      shouldBeArray: Boolean): Unit = {
+    val actual: Iterator[Any] = result match {
+      case Left(arr: Array[Any]) =>
+        assert(shouldBeArray, "expected iterator from unroll!")
+        arr.iterator
+      case Right(it: Iterator[Any]) =>
+        assert(!shouldBeArray, "expected array from unroll!")
+        it
+      case _ =>
+        fail("unroll returned neither an iterator nor an array...")
+    }
+    expected.zip(actual).foreach { case (e, a) =>
+      assert(e === a, "unroll did not return original values!")
+    }
+  }
+
+  test("safely unroll blocks") {
+    store = makeBlockManager(12000)
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    val memoryStore = store.memoryStore
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with all the space in the world. This should succeed and return an array.
+    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with not enough space. This should succeed after kicking out someBlock1.
+    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
+    droppedBlocks.clear()
+
+    // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
+    // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
+    // In the mean time, however, we kicked out someBlock2 before giving up.
+    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+    verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
+    droppedBlocks.clear()
+  }
+
+  test("safely unroll blocks through putIterator") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(result1.size > 0) // unroll was successful
+    assert(result2.size > 0)
+    assert(result1.data.isLeft) // unroll did not drop this block to disk
+    assert(result2.data.isLeft)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+    // would not know how to drop them from memory later.
+    memoryStore.remove("b1")
+    memoryStore.remove("b2")
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in the process.
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    assert(result3.size > 0)
+    assert(result3.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, memOnly)
+
+    // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
+    assert(result4.size === 0) // unroll was unsuccessful
+    assert(result4.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+  }
+
+  /**
+   * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK.
+   */
+  test("safely unroll blocks through putIterator (disk)") {
+    store = makeBlockManager(12000)
+    val memAndDisk = StorageLevel.MEMORY_AND_DISK
+    val memoryStore = store.memoryStore
+    val diskStore = store.diskStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in the process.
+    // Memory store should contain b2 and b3, while disk store should contain only b1
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
+    assert(result3.size > 0)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(diskStore.contains("b1"))
+    assert(!diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll huge block with not enough space. This should fail and drop the new block to disk
+    // directly in addition to kicking out b2 in the process. Memory store should contain only
+    // b3, while disk store should contain b1, b2 and b4.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
+    assert(result4.size > 0)
+    assert(result4.data.isRight) // unroll returned bytes from disk
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(diskStore.contains("b1"))
+    assert(diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    assert(diskStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+  }
+
+  test("multiple unrolls by the same thread") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // All unroll memory used is released because unrollSafely returned an array
+    memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll memory is not released because unrollSafely returned an iterator
+    // that still depends on the underlying vector used in the process
+    memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB3 > 0)
+
+    // The unroll memory owned by this thread builds on top of its value after the previous unrolls
+    memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+    // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
+    memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
deleted file mode 100644
index 93f0c6a..0000000
--- a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-
-class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
-  val NORMAL_ERROR = 0.20
-  val HIGH_ERROR = 0.30
-
-  test("fixed size insertions") {
-    testWith[Int, Long](10000, i => (i, i.toLong))
-    testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
-    testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
-  }
-
-  test("variable size insertions") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[Int, String](10000, i => (i, randString(0, 10)))
-    testWith[Int, String](10000, i => (i, randString(0, 100)))
-    testWith[Int, String](10000, i => (i, randString(90, 100)))
-  }
-
-  test("updates") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[String, Int](10000, i => (randString(0, 10000), i))
-  }
-
-  def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
-    val map = new SizeTrackingAppendOnlyMap[K, V]()
-    for (i <- 0 until numElements) {
-      val (k, v) = makeElement(i)
-      map(k) = v
-      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
-    }
-  }
-
-  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
-    val betterEstimatedSize = SizeEstimator.estimate(obj)
-    assert(betterEstimatedSize * (1 - error) < estimatedSize,
-      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
-    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
-      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
-  }
-}
-
-object SizeTrackingAppendOnlyMapSuite {
-  // Speed test, for reproducibility of results.
-  // These could be highly non-deterministic in general, however.
-  // Results:
-  // AppendOnlyMap:   31 ms
-  // SizeTracker:     54 ms
-  // SizeEstimator: 1500 ms
-  def main(args: Array[String]) {
-    val numElements = 100000
-
-    val baseTimes = for (i <- 0 until 10) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-      }
-    }
-
-    val sampledTimes = for (i <- 0 until 10) yield time {
-      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        map.estimateSize()
-      }
-    }
-
-    val unsampledTimes = for (i <- 0 until 3) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        SizeEstimator.estimate(map)
-      }
-    }
-
-    println("Base: " + baseTimes)
-    println("SizeTracker (sampled): " + sampledTimes)
-    println("SizeEstimator (unsampled): " + unsampledTimes)
-  }
-
-  def time(f: => Unit): Long = {
-    val start = System.currentTimeMillis()
-    f
-    System.currentTimeMillis() - start
-  }
-
-  private class LargeDummyClass {
-    val arr = new Array[Int](100)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
new file mode 100644
index 0000000..1f33967
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class SizeTrackerSuite extends FunSuite {
+  val NORMAL_ERROR = 0.20
+  val HIGH_ERROR = 0.30
+
+  import SizeTrackerSuite._
+
+  test("vector fixed size insertions") {
+    testVector[Long](10000, i => i.toLong)
+    testVector[(Long, Long)](10000, i => (i.toLong, i.toLong))
+    testVector[LargeDummyClass](10000, i => new LargeDummyClass)
+  }
+
+  test("vector variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testVector[String](10000, i => randString(0, 10))
+    testVector[String](10000, i => randString(0, 100))
+    testVector[String](10000, i => randString(90, 100))
+  }
+
+  test("map fixed size insertions") {
+    testMap[Int, Long](10000, i => (i, i.toLong))
+    testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+    testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass))
+  }
+
+  test("map variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[Int, String](10000, i => (i, randString(0, 10)))
+    testMap[Int, String](10000, i => (i, randString(0, 100)))
+    testMap[Int, String](10000, i => (i, randString(90, 100)))
+  }
+
+  test("map updates") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[String, Int](10000, i => (randString(0, 10000), i))
+  }
+
+  def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) {
+    val vector = new SizeTrackingVector[T]
+    for (i <- 0 until numElements) {
+      val item = makeElement(i)
+      vector += item
+      expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+    }
+  }
+
+  def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+    val map = new SizeTrackingAppendOnlyMap[K, V]
+    for (i <- 0 until numElements) {
+      val (k, v) = makeElement(i)
+      map(k) = v
+      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+    }
+  }
+
+  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+    val betterEstimatedSize = SizeEstimator.estimate(obj)
+    assert(betterEstimatedSize * (1 - error) < estimatedSize,
+      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+  }
+}
+
+private object SizeTrackerSuite {
+
+  /**
+   * Run speed tests for size tracking collections.
+   */
+  def main(args: Array[String]): Unit = {
+    if (args.size < 1) {
+      println("Usage: SizeTrackerSuite [num elements]")
+      System.exit(1)
+    }
+    val numElements = args(0).toInt
+    vectorSpeedTest(numElements)
+    mapSpeedTest(numElements)
+  }
+
+  /**
+   * Speed test for SizeTrackingVector.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   PrimitiveVector  15 ms
+   *   SizeTracker      51 ms
+   *   SizeEstimator    2000 ms
+   */
+  def vectorSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val vector = new SizeTrackingVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        vector.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        SizeEstimator.estimate(vector)
+      }
+    }
+    printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, unsampledTimes)
+  }
+
+  /**
+   * Speed test for SizeTrackingAppendOnlyMap.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   AppendOnlyMap  30 ms
+   *   SizeTracker    41 ms
+   *   SizeEstimator  1666 ms
+   */
+  def mapSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        map.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        SizeEstimator.estimate(map)
+      }
+    }
+    printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, unsampledTimes)
+  }
+
+  def printSpeedTestResult(
+      testName: String,
+      baseTimes: Seq[Long],
+      sampledTimes: Seq[Long],
+      unsampledTimes: Seq[Long]): Unit = {
+    println(s"Average times for $testName (ms):")
+    println("  Base - " + averageTime(baseTimes))
+    println("  SizeTracker (sampled) - " + averageTime(sampledTimes))
+    println("  SizeEstimator (unsampled) - " + averageTime(unsampledTimes))
+    println()
+  }
+
+  def time(f: => Unit): Long = {
+    val start = System.currentTimeMillis()
+    f
+    System.currentTimeMillis() - start
+  }
+
+  def averageTime(v: Seq[Long]): Long = {
+    v.sum / v.size
+  }
+
+  private class LargeDummyClass {
+    val arr = new Array[Int](100)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 46e3dd9..2e6c85c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -481,6 +481,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.storage.unrollFraction</code></td>
+  <td>0.2</td>
+  <td>
+    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+    This is dynamically allocated by dropping existing blocks when there is not enough free
+    storage space to unroll the new block in its entirety.
+  </td>
+</tr>
+<tr>
   <td><code>spark.tachyonStore.baseDir</code></td>
   <td>System.getProperty("java.io.tmpdir")</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e9220db..5ff88f0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._
  * MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
  */
 object MimaExcludes {
-
     def excludes(version: String) =
       version match {
         case v if v.startsWith("1.1") =>
@@ -63,6 +62,15 @@ object MimaExcludes {
               "org.apache.spark.storage.MemoryStore.Entry")
           ) ++
           Seq(
+            // Renamed putValues -> putArray + putIterator
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.MemoryStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.DiskStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.TachyonStore.putValues")
+          ) ++
+          Seq(
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
           ) ++
           Seq( // Ignore some private methods in ALS.

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index ce8316b..d934b9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
-      storageLevel, tellMaster = true)
+    blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
     reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
   }
@@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+    blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
     reportPushedBlock(blockId, -1, optionalMetadata)
   }


[2/2] git commit: [SPARK-1777] Prevent OOMs from single partitions

Posted by ma...@apache.org.
[SPARK-1777] Prevent OOMs from single partitions

**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large.

**Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable.

**New configurations.**
- `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2)
- `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9)

For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns.

Author: Andrew Or <an...@gmail.com>

Closes #1165 from andrewor14/them-rdd-memories and squashes the following commits:

e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
c7c8832 [Andrew Or] Simplify logic + update a few comments
269d07b [Andrew Or] Very minor changes to tests
6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b7e165c [Andrew Or] Add new tests for unrolling blocks
f12916d [Andrew Or] Slightly clean up tests
71672a7 [Andrew Or] Update unrollSafely tests
369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior
f4d035c [Andrew Or] Allow one thread to unroll multiple blocks
a66fbd2 [Andrew Or] Rename a few things + update comments
68730b3 [Andrew Or] Fix weird scalatest behavior
e40c60d [Andrew Or] Fix MIMA excludes
ff77aa1 [Andrew Or] Fix tests
1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap
ed6cda4 [Andrew Or] Formatting fix (super minor)
f9ff82e [Andrew Or] putValues -> putIterator + putArray
beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8448c9b [Andrew Or] Fix tests
a49ba4d [Andrew Or] Do not expose unroll memory check period
69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap
3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace
dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8288228 [Andrew Or] Synchronize put and unroll properly
4f18a3d [Andrew Or] bufferFraction -> unrollFraction
28edfa3 [Andrew Or] Update a few comments / log messages
728323b [Andrew Or] Do not synchronize every 1000 elements
5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
129c441 [Andrew Or] Fix bug: Use toArray rather than array
9a65245 [Andrew Or] Update a few comments + minor control flow changes
57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case
3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes)
f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
0871835 [Andrew Or] Add an effective storage level interface to BlockManager
64e7d4c [Andrew Or] Add/modify a few comments (minor)
8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
4f4834e [Andrew Or] Use original storage level for blocks dropped to disk
ecc8c2d [Andrew Or] Fix binary incompatibility
24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk
2b7ee66 [Andrew Or] Fix bug in SizeTracking*
9b9a273 [Andrew Or] Fix tests
20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
649bdb3 [Andrew Or] Document spark.storage.bufferFraction
a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things
e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap
198e374 [Andrew Or] Unfold -> unroll
0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d9d02a8 [Andrew Or] Remove unused param in unfoldSafely
ec728d8 [Andrew Or] Add tests for safe unfolding of blocks
22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator
0871535 [Andrew Or] Fix tests in BlockManagerSuite
d68f31e [Andrew Or] Safely unfold blocks for all memory puts
5961f50 [Andrew Or] Fix tests
195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore
1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d5dd3b4 [Andrew Or] Free buffer memory in finally
ea02eec [Andrew Or] Fix tests
b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
87aa75c [Andrew Or] Fix mima excludes again (typo)
11eb921 [Andrew Or] Clarify comment (minor)
50cae44 [Andrew Or] Remove now duplicate mima exclude
7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
df47265 [Andrew Or] Fix binary incompatibility
6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
f94f5af [Andrew Or] Update a few comments (minor)
776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large
bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array
97ea499 [Andrew Or] Change BlockManager interface to use Arrays
c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf30ee7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf30ee7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf30ee7

Branch: refs/heads/master
Commit: ecf30ee7e78ea59c462c54db0fde5328f997466c
Parents: f6ff2a6
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jul 27 16:08:16 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Jul 27 16:08:16 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  72 ++-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../org/apache/spark/executor/Executor.scala    |   4 +-
 .../org/apache/spark/storage/BlockManager.scala | 110 ++--
 .../org/apache/spark/storage/BlockStore.scala   |   6 +-
 .../org/apache/spark/storage/DiskStore.scala    |  12 +-
 .../org/apache/spark/storage/MemoryStore.scala  | 256 +++++++-
 .../org/apache/spark/storage/TachyonStore.scala |  12 +-
 .../apache/spark/storage/ThreadingTest.scala    |   2 +-
 .../org/apache/spark/util/SizeEstimator.scala   |   2 +-
 .../spark/util/collection/PrimitiveVector.scala |  15 +-
 .../spark/util/collection/SizeTracker.scala     | 105 ++++
 .../collection/SizeTrackingAppendOnlyMap.scala  |  71 +--
 .../util/collection/SizeTrackingVector.scala    |  46 ++
 .../org/apache/spark/CacheManagerSuite.scala    |  25 +-
 .../spark/storage/BlockManagerSuite.scala       | 594 +++++++++++++------
 .../util/SizeTrackingAppendOnlyMapSuite.scala   | 120 ----
 .../util/collection/SizeTrackerSuite.scala      | 204 +++++++
 docs/configuration.md                           |   9 +
 project/MimaExcludes.scala                      |  10 +-
 .../receiver/ReceiverSupervisorImpl.scala       |   5 +-
 21 files changed, 1165 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8f86768..5ddda4d 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.executor.InputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -30,7 +30,7 @@ import org.apache.spark.storage._
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
 
   /** Keys of RDD partitions that are being computed/loaded. */
-  private val loading = new HashSet[RDDBlockId]()
+  private val loading = new mutable.HashSet[RDDBlockId]
 
   /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](
@@ -118,21 +118,29 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   }
 
   /**
-   * Cache the values of a partition, keeping track of any updates in the storage statuses
-   * of other blocks along the way.
+   * Cache the values of a partition, keeping track of any updates in the storage statuses of
+   * other blocks along the way.
+   *
+   * The effective storage level refers to the level that actually specifies BlockManager put
+   * behavior, not the level originally specified by the user. This is mainly for forcing a
+   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
+   * while preserving the the original semantics of the RDD as specified by the application.
    */
   private def putInBlockManager[T](
       key: BlockId,
       values: Iterator[T],
-      storageLevel: StorageLevel,
-      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
-
-    if (!storageLevel.useMemory) {
-      /* This RDD is not to be cached in memory, so we can just pass the computed values
-       * as an iterator directly to the BlockManager, rather than first fully unrolling
-       * it in memory. The latter option potentially uses much more memory and risks OOM
-       * exceptions that can be avoided. */
-      updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+      level: StorageLevel,
+      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
+      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
+
+    val putLevel = effectiveStorageLevel.getOrElse(level)
+    if (!putLevel.useMemory) {
+      /*
+       * This RDD is not to be cached in memory, so we can just pass the computed values as an
+       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
+       */
+      updatedBlocks ++=
+        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
       blockManager.get(key) match {
         case Some(v) => v.data.asInstanceOf[Iterator[T]]
         case None =>
@@ -140,14 +148,36 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           throw new BlockException(key, s"Block manager failed to return cached value for $key!")
       }
     } else {
-      /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+      /*
+       * This RDD is to be cached in memory. In this case we cannot pass the computed values
        * to the BlockManager as an iterator and expect to read it back later. This is because
-       * we may end up dropping a partition from memory store before getting it back, e.g.
-       * when the entirety of the RDD does not fit in memory. */
-      val elements = new ArrayBuffer[Any]
-      elements ++= values
-      updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
-      elements.iterator.asInstanceOf[Iterator[T]]
+       * we may end up dropping a partition from memory store before getting it back.
+       *
+       * In addition, we must be careful to not unroll the entire partition in memory at once.
+       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
+       * single partition. Instead, we unroll the values cautiously, potentially aborting and
+       * dropping the partition to disk if applicable.
+       */
+      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+        case Left(arr) =>
+          // We have successfully unrolled the entire partition, so cache it in memory
+          updatedBlocks ++=
+            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+          arr.iterator.asInstanceOf[Iterator[T]]
+        case Right(it) =>
+          // There is not enough space to cache this partition in memory
+          logWarning(s"Not enough space to cache partition $key in memory! " +
+            s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
+          val returnValues = it.asInstanceOf[Iterator[T]]
+          if (putLevel.useDisk) {
+            logWarning(s"Persisting partition $key to disk instead.")
+            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
+              useOffHeap = false, deserialized = false, putLevel.replication)
+            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+          } else {
+            returnValues
+          }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8f70744..6ee731b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -67,7 +67,7 @@ class SparkEnv (
     val metricsSystem: MetricsSystem,
     val conf: SparkConf) extends Logging {
 
-  // A mapping of thread ID to amount of memory used for shuffle in bytes
+  // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b16133b..3b69bc4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -266,11 +266,13 @@ private[spark] class Executor(
           }
         }
       } finally {
-        // TODO: Unregister shuffle memory only for ResultTask
+        // Release memory used by this thread for shuffles
         val shuffleMemoryMap = env.shuffleMemoryMap
         shuffleMemoryMap.synchronized {
           shuffleMemoryMap.remove(Thread.currentThread().getId)
         }
+        // Release memory used by this thread for unrolling blocks
+        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
         runningTasks.remove(taskId)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0db0a5b..d746526 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util._
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -71,9 +71,9 @@ private[spark] class BlockManager(
 
   // Actual storage of where blocks are kept
   private var tachyonInitialized = false
-  private[storage] val memoryStore = new MemoryStore(this, maxMemory)
-  private[storage] val diskStore = new DiskStore(this, diskBlockManager)
-  private[storage] lazy val tachyonStore: TachyonStore = {
+  private[spark] val memoryStore = new MemoryStore(this, maxMemory)
+  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+  private[spark] lazy val tachyonStore: TachyonStore = {
     val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
     val appFolderName = conf.get("spark.tachyonStore.folderName")
     val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
@@ -463,16 +463,17 @@ private[spark] class BlockManager(
               val values = dataDeserialize(blockId, bytes)
               if (level.deserialized) {
                 // Cache the values before returning them
-                // TODO: Consider creating a putValues that also takes in a iterator?
-                val valuesBuffer = new ArrayBuffer[Any]
-                valuesBuffer ++= values
-                memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
-                  match {
-                    case Left(values2) =>
-                      return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
-                    case _ =>
-                      throw new SparkException("Memory store did not return back an iterator")
-                  }
+                val putResult = memoryStore.putIterator(
+                  blockId, values, level, returnValues = true, allowPersistToDisk = false)
+                // The put may or may not have succeeded, depending on whether there was enough
+                // space to unroll the block. Either way, the put here should return an iterator.
+                putResult.data match {
+                  case Left(it) =>
+                    return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+                  case _ =>
+                    // This only happens if we dropped the values back to disk (which is never)
+                    throw new SparkException("Memory store did not return an iterator!")
+                }
               } else {
                 return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
               }
@@ -561,13 +562,14 @@ private[spark] class BlockManager(
     iter
   }
 
-  def put(
+  def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(values), level, tellMaster)
+    doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
   }
 
   /**
@@ -589,13 +591,14 @@ private[spark] class BlockManager(
    * Put a new block of values to the block manager.
    * Return a list of blocks updated as a result of this put.
    */
-  def put(
+  def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(values != null, "Values is null")
-    doPut(blockId, ArrayBufferValues(values), level, tellMaster)
+    doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
   }
 
   /**
@@ -606,19 +609,33 @@ private[spark] class BlockManager(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
+    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
   }
 
+  /**
+   * Put the given block according to the given level in one of the block stores, replicating
+   * the values if necessary.
+   *
+   * The effective storage level refers to the level according to which the block will actually be
+   * handled. This allows the caller to specify an alternate behavior of doPut while preserving
+   * the original level specified by the user.
+   */
   private def doPut(
       blockId: BlockId,
       data: BlockValues,
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None)
+    : Seq[(BlockId, BlockStatus)] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
+    effectiveStorageLevel.foreach { level =>
+      require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
+    }
 
     // Return value
     val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -657,13 +674,16 @@ private[spark] class BlockManager(
     // Size of the block in bytes
     var size = 0L
 
+    // The level we actually use to put the block
+    val putLevel = effectiveStorageLevel.getOrElse(level)
+
     // If we're storing bytes, then initiate the replication before storing them locally.
     // This is faster as data is already serialized and ready to send.
     val replicationFuture = data match {
-      case b: ByteBufferValues if level.replication > 1 =>
+      case b: ByteBufferValues if putLevel.replication > 1 =>
         // Duplicate doesn't copy the bytes, but just creates a wrapper
         val bufferView = b.buffer.duplicate()
-        Future { replicate(blockId, bufferView, level) }
+        Future { replicate(blockId, bufferView, putLevel) }
       case _ => null
     }
 
@@ -676,18 +696,18 @@ private[spark] class BlockManager(
         // returnValues - Whether to return the values put
         // blockStore - The type of storage to put these values into
         val (returnValues, blockStore: BlockStore) = {
-          if (level.useMemory) {
+          if (putLevel.useMemory) {
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
             (true, memoryStore)
-          } else if (level.useOffHeap) {
+          } else if (putLevel.useOffHeap) {
             // Use tachyon for off-heap storage
             (false, tachyonStore)
-          } else if (level.useDisk) {
+          } else if (putLevel.useDisk) {
             // Don't get back the bytes from put unless we replicate them
-            (level.replication > 1, diskStore)
+            (putLevel.replication > 1, diskStore)
           } else {
-            assert(level == StorageLevel.NONE)
+            assert(putLevel == StorageLevel.NONE)
             throw new BlockException(
               blockId, s"Attempted to put block $blockId without specifying storage level!")
           }
@@ -696,22 +716,22 @@ private[spark] class BlockManager(
         // Actually put the values
         val result = data match {
           case IteratorValues(iterator) =>
-            blockStore.putValues(blockId, iterator, level, returnValues)
-          case ArrayBufferValues(array) =>
-            blockStore.putValues(blockId, array, level, returnValues)
+            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
+          case ArrayValues(array) =>
+            blockStore.putArray(blockId, array, putLevel, returnValues)
           case ByteBufferValues(bytes) =>
             bytes.rewind()
-            blockStore.putBytes(blockId, bytes, level)
+            blockStore.putBytes(blockId, bytes, putLevel)
         }
         size = result.size
         result.data match {
-          case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+          case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
           case Right (newBytes) => bytesAfterPut = newBytes
           case _ =>
         }
 
         // Keep track of which blocks are dropped from memory
-        if (level.useMemory) {
+        if (putLevel.useMemory) {
           result.droppedBlocks.foreach { updatedBlocks += _ }
         }
 
@@ -742,7 +762,7 @@ private[spark] class BlockManager(
 
     // Either we're storing bytes and we asynchronously started replication, or we're storing
     // values and need to serialize and replicate them now:
-    if (level.replication > 1) {
+    if (putLevel.replication > 1) {
       data match {
         case ByteBufferValues(bytes) =>
           if (replicationFuture != null) {
@@ -758,7 +778,7 @@ private[spark] class BlockManager(
             }
             bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
           }
-          replicate(blockId, bytesAfterPut, level)
+          replicate(blockId, bytesAfterPut, putLevel)
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
       }
@@ -766,7 +786,7 @@ private[spark] class BlockManager(
 
     BlockManager.dispose(bytesAfterPut)
 
-    if (level.replication > 1) {
+    if (putLevel.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
@@ -818,7 +838,7 @@ private[spark] class BlockManager(
       value: Any,
       level: StorageLevel,
       tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
-    put(blockId, Iterator(value), level, tellMaster)
+    putIterator(blockId, Iterator(value), level, tellMaster)
   }
 
   /**
@@ -829,7 +849,7 @@ private[spark] class BlockManager(
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
 
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId).orNull
@@ -853,7 +873,7 @@ private[spark] class BlockManager(
           logInfo(s"Writing block $blockId to disk")
           data match {
             case Left(elements) =>
-              diskStore.putValues(blockId, elements, level, returnValues = false)
+              diskStore.putArray(blockId, elements, level, returnValues = false)
             case Right(bytes) =>
               diskStore.putBytes(blockId, bytes, level)
           }
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
 private[spark] object BlockManager extends Logging {
   private val ID_GENERATOR = new IdGenerator
 
+  /** Return the total amount of storage memory available. */
   private def getMaxMemory(conf: SparkConf): Long = {
     val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
-    (Runtime.getRuntime.maxMemory * memoryFraction).toLong
+    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
+    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
 
   def getHeartBeatFrequency(conf: SparkConf): Long =

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index b9b53b1..69985c9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -37,15 +37,15 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
    * @return a PutResult that contains the size of the data, as well as the values put if
    *         returnValues is true (if not, the result's data field can be null)
    */
-  def putValues(
+  def putIterator(
     blockId: BlockId,
     values: Iterator[Any],
     level: StorageLevel,
     returnValues: Boolean): PutResult
 
-  def putValues(
+  def putArray(
     blockId: BlockId,
-    values: ArrayBuffer[Any],
+    values: Array[Any],
     level: StorageLevel,
     returnValues: Boolean): PutResult
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index ebff0cb..c83261d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
 /**
  * Stores BlockManager blocks on disk.
  */
-private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
+private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
   extends BlockStore(blockManager) with Logging {
 
   val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
@@ -57,15 +55,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
     PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putValues(blockId, values.toIterator, level, returnValues)
+    putIterator(blockId, values.toIterator, level, returnValues)
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 71f66c8..28f675c 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -20,27 +20,45 @@ package org.apache.spark.storage
 import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.collection.SizeTrackingVector
 
 private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
 
 /**
- * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * Stores blocks in memory, either as Arrays of deserialized Java objects or as
  * serialized ByteBuffers.
  */
-private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   extends BlockStore(blockManager) {
 
+  private val conf = blockManager.conf
   private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+
   @volatile private var currentMemory = 0L
-  // Object used to ensure that only one thread is putting blocks and if necessary, dropping
-  // blocks from the memory store.
-  private val putLock = new Object()
+
+  // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
+  private val accountingLock = new Object
+
+  // A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
+  // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+
+  /**
+   * The amount of space ensured for unrolling values in memory, shared across all cores.
+   * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
+   */
+  private val maxUnrollMemory: Long = {
+    val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2)
+    (maxMemory * unrollFraction).toLong
+  }
 
   logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
 
+  /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */
   def freeMemory: Long = maxMemory - currentMemory
 
   override def getSize(blockId: BlockId): Long = {
@@ -55,20 +73,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     bytes.rewind()
     if (level.deserialized) {
       val values = blockManager.dataDeserialize(blockId, bytes)
-      val elements = new ArrayBuffer[Any]
-      elements ++= values
-      val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
-      val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
-      PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
+      putIterator(blockId, values, level, returnValues = true)
     } else {
       val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
     }
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
     if (level.deserialized) {
@@ -82,14 +96,52 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    val valueEntries = new ArrayBuffer[Any]()
-    valueEntries ++= values
-    putValues(blockId, valueEntries, level, returnValues)
+    putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+  }
+
+  /**
+   * Attempt to put the given block in memory store.
+   *
+   * There may not be enough space to fully unroll the iterator in memory, in which case we
+   * optionally drop the values to disk if
+   *   (1) the block's storage level specifies useDisk, and
+   *   (2) `allowPersistToDisk` is true.
+   *
+   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
+   * back from disk and attempts to cache it in memory. In this case, we should not persist the
+   * block back on disk again, as it is already in disk store.
+   */
+  private[storage] def putIterator(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      returnValues: Boolean,
+      allowPersistToDisk: Boolean): PutResult = {
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+    unrolledValues match {
+      case Left(arrayValues) =>
+        // Values are fully unrolled in memory, so store them as an array
+        val res = putArray(blockId, arrayValues, level, returnValues)
+        droppedBlocks ++= res.droppedBlocks
+        PutResult(res.size, res.data, droppedBlocks)
+      case Right(iteratorValues) =>
+        // Not enough space to unroll this block; drop to disk if applicable
+        logWarning(s"Not enough space to store block $blockId in memory! " +
+          s"Free memory is $freeMemory bytes.")
+        if (level.useDisk && allowPersistToDisk) {
+          logWarning(s"Persisting block $blockId to disk instead.")
+          val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
+          PutResult(res.size, res.data, droppedBlocks)
+        } else {
+          PutResult(0, Left(iteratorValues), droppedBlocks)
+        }
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -99,7 +151,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     if (entry == null) {
       None
     } else if (entry.deserialized) {
-      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
     } else {
       Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
     }
@@ -112,7 +164,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     if (entry == null) {
       None
     } else if (entry.deserialized) {
-      Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+      Some(entry.value.asInstanceOf[Array[Any]].iterator)
     } else {
       val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
       Some(blockManager.dataDeserialize(blockId, buffer))
@@ -141,6 +193,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are still satisfied,
+   * stopping immediately if not. This check is a safeguard against the scenario in which
+   * there is not enough free memory to accommodate the entirety of a single block.
+   *
+   * This method returns either an array with the contents of the entire block or an iterator
+   * containing the values of the block (if the array would have exceeded available memory).
+   */
+  def unrollSafely(
+      blockId: BlockId,
+      values: Iterator[Any],
+      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+    : Either[Array[Any], Iterator[Any]] = {
+
+    // Number of elements unrolled so far
+    var elementsUnrolled = 0
+    // Whether there is still enough memory for us to continue unrolling this block
+    var keepUnrolling = true
+    // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
+    val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+    // How often to check whether we need to request more memory
+    val memoryCheckPeriod = 16
+    // Memory currently reserved by this thread for this particular unrolling operation
+    var memoryThreshold = initialMemoryThreshold
+    // Memory to request as a multiple of current vector size
+    val memoryGrowthFactor = 1.5
+    // Previous unroll memory held by this thread, for releasing later (only at the very end)
+    val previousMemoryReserved = currentUnrollMemoryForThisThread
+    // Underlying vector for unrolling the block
+    var vector = new SizeTrackingVector[Any]
+
+    // Request enough memory to begin unrolling
+    keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
+
+    // Unroll this block safely, checking whether we have exceeded our threshold periodically
+    try {
+      while (values.hasNext && keepUnrolling) {
+        vector += values.next()
+        if (elementsUnrolled % memoryCheckPeriod == 0) {
+          // If our vector's size has exceeded the threshold, request more memory
+          val currentSize = vector.estimateSize()
+          if (currentSize >= memoryThreshold) {
+            val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
+            // Hold the accounting lock, in case another thread concurrently puts a block that
+            // takes up the unrolling space we just ensured here
+            accountingLock.synchronized {
+              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
+                // If the first request is not granted, try again after ensuring free space
+                // If there is still not enough space, give up and drop the partition
+                val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
+                if (spaceToEnsure > 0) {
+                  val result = ensureFreeSpace(blockId, spaceToEnsure)
+                  droppedBlocks ++= result.droppedBlocks
+                }
+                keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
+              }
+            }
+            // New threshold is currentSize * memoryGrowthFactor
+            memoryThreshold = currentSize + amountToRequest
+          }
+        }
+        elementsUnrolled += 1
+      }
+
+      if (keepUnrolling) {
+        // We successfully unrolled the entirety of this block
+        Left(vector.toArray)
+      } else {
+        // We ran out of space while unrolling the values for this block
+        Right(vector.iterator ++ values)
+      }
+
+    } finally {
+      // If we return an array, the values returned do not depend on the underlying vector and
+      // we can immediately free up space for other threads. Otherwise, if we return an iterator,
+      // we release the memory claimed by this thread later on when the task finishes.
+      if (keepUnrolling) {
+        val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+        releaseUnrollMemoryForThisThread(amountToRelease)
+      }
+    }
+  }
+
+  /**
    * Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
    */
   private def getRddId(blockId: BlockId): Option[Int] = {
@@ -149,10 +288,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
-   * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
-   * size must also be passed by the caller.
+   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
+   * must also be passed by the caller.
    *
-   * Lock on the object putLock to ensure that all the put requests and its associated block
+   * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread is dropping
    * blocks to free memory for one block, another thread may use up the freed space for
    * another block.
@@ -174,7 +313,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     var putSuccess = false
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
-    putLock.synchronized {
+    accountingLock.synchronized {
       val freeSpaceResult = ensureFreeSpace(blockId, size)
       val enoughFreeSpace = freeSpaceResult.success
       droppedBlocks ++= freeSpaceResult.droppedBlocks
@@ -193,7 +332,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
         val data = if (deserialized) {
-          Left(value.asInstanceOf[ArrayBuffer[Any]])
+          Left(value.asInstanceOf[Array[Any]])
         } else {
           Right(value.asInstanceOf[ByteBuffer].duplicate())
         }
@@ -210,12 +349,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
    * don't fit into memory that we want to avoid).
    *
-   * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
-   * Otherwise, the freed space may fill up before the caller puts in their new value.
+   * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
+   * blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
    *
    * Return whether there is enough free space, along with the blocks dropped in the process.
    */
-  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
+  private def ensureFreeSpace(
+      blockIdToAdd: BlockId,
+      space: Long): ResultWithDroppedBlocks = {
     logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
 
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -225,9 +366,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       return ResultWithDroppedBlocks(success = false, droppedBlocks)
     }
 
-    if (maxMemory - currentMemory < space) {
+    // Take into account the amount of memory currently occupied by unrolling blocks
+    val actualFreeMemory = freeMemory - currentUnrollMemory
+
+    if (actualFreeMemory < space) {
       val rddToAdd = getRddId(blockIdToAdd)
-      val selectedBlocks = new ArrayBuffer[BlockId]()
+      val selectedBlocks = new ArrayBuffer[BlockId]
       var selectedMemory = 0L
 
       // This is synchronized to ensure that the set of entries is not changed
@@ -235,7 +379,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       // can lead to exceptions.
       entries.synchronized {
         val iterator = entries.entrySet().iterator()
-        while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+        while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
           if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
@@ -245,7 +389,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         }
       }
 
-      if (maxMemory - (currentMemory - selectedMemory) >= space) {
+      if (actualFreeMemory + selectedMemory >= space) {
         logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
           val entry = entries.synchronized { entries.get(blockId) }
@@ -254,7 +398,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
           // future safety.
           if (entry != null) {
             val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+              Left(entry.value.asInstanceOf[Array[Any]])
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
@@ -275,8 +419,56 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   override def contains(blockId: BlockId): Boolean = {
     entries.synchronized { entries.containsKey(blockId) }
   }
+
+  /**
+   * Reserve additional memory for unrolling blocks used by this thread.
+   * Return whether the request is granted.
+   */
+  private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+    accountingLock.synchronized {
+      val granted = freeMemory > currentUnrollMemory + memory
+      if (granted) {
+        val threadId = Thread.currentThread().getId
+        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
+      }
+      granted
+    }
+  }
+
+  /**
+   * Release memory used by this thread for unrolling blocks.
+   * If the amount is not specified, remove the current thread's allocation altogether.
+   */
+  private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+    val threadId = Thread.currentThread().getId
+    accountingLock.synchronized {
+      if (memory < 0) {
+        unrollMemoryMap.remove(threadId)
+      } else {
+        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
+        // If this thread claims no more unroll memory, release it completely
+        if (unrollMemoryMap(threadId) <= 0) {
+          unrollMemoryMap.remove(threadId)
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks across all threads.
+   */
+  private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+    unrollMemoryMap.values.sum
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks by this thread.
+   */
+  private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+    unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
+  }
 }
 
-private case class ResultWithDroppedBlocks(
+private[spark] case class ResultWithDroppedBlocks(
     success: Boolean,
     droppedBlocks: Seq[(BlockId, BlockStatus)])

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index d8ff4ff..932b561 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,8 +20,6 @@ package org.apache.spark.storage
 import java.io.IOException
 import java.nio.ByteBuffer
 
-import scala.collection.mutable.ArrayBuffer
-
 import tachyon.client.{ReadType, WriteType}
 
 import org.apache.spark.Logging
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
 /**
  * Stores BlockManager blocks on Tachyon.
  */
-private class TachyonStore(
+private[spark] class TachyonStore(
     blockManager: BlockManager,
     tachyonManager: TachyonBlockManager)
   extends BlockStore(blockManager: BlockManager) with Logging {
@@ -45,15 +43,15 @@ private class TachyonStore(
     putIntoTachyonStore(blockId, bytes, returnValues = true)
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putValues(blockId, values.toIterator, level, returnValues)
+    putIterator(blockId, values.toIterator, level, returnValues)
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 328be15..75c2e09 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
         val block = (1 to blockSize).map(_ => Random.nextInt())
         val level = randomLevel()
         val startTime = System.currentTimeMillis()
-        manager.put(blockId, block.iterator, level, tellMaster = true)
+        manager.putIterator(blockId, block.iterator, level, tellMaster = true)
         println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
         queue.add((blockId, block))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 0846557..bce3b3a 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
     }
   }
 
-  // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+  // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
   private val ARRAY_SIZE_FOR_SAMPLING = 200
   private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index b84eb65..7e76d06 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
     _array(index)
   }
 
-  def +=(value: V) {
+  def +=(value: V): Unit = {
     if (_numElements == _array.length) {
       resize(_array.length * 2)
     }
@@ -50,6 +50,19 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
 
   def size: Int = _numElements
 
+  def iterator: Iterator[V] = new Iterator[V] {
+    var index = 0
+    override def hasNext: Boolean = index < _numElements
+    override def next(): V = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      val value = _array(index)
+      index += 1
+      value
+    }
+  }
+
   /** Gets the underlying array backing this vector. */
   def array: Array[V] = _array
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
new file mode 100644
index 0000000..3eb1010
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * A general interface for collections to keep track of their estimated sizes in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).
+ */
+private[spark] trait SizeTracker {
+
+  import SizeTracker._
+
+  /**
+   * Controls the base of the exponential which governs the rate of sampling.
+   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+   */
+  private val SAMPLE_GROWTH_RATE = 1.1
+
+  /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */
+  private val samples = new mutable.Queue[Sample]
+
+  /** The average number of bytes per update between our last two samples. */
+  private var bytesPerUpdate: Double = _
+
+  /** Total number of insertions and updates into the map since the last resetSamples(). */
+  private var numUpdates: Long = _
+
+  /** The value of 'numUpdates' at which we will take our next sample. */
+  private var nextSampleNum: Long = _
+
+  resetSamples()
+
+  /**
+   * Reset samples collected so far.
+   * This should be called after the collection undergoes a dramatic change in size.
+   */
+  protected def resetSamples(): Unit = {
+    numUpdates = 1
+    nextSampleNum = 1
+    samples.clear()
+    takeSample()
+  }
+
+  /**
+   * Callback to be invoked after every update.
+   */
+  protected def afterUpdate(): Unit = {
+    numUpdates += 1
+    if (nextSampleNum == numUpdates) {
+      takeSample()
+    }
+  }
+
+  /**
+   * Take a new sample of the current collection's size.
+   */
+  private def takeSample(): Unit = {
+    samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
+    // Only use the last two samples to extrapolate
+    if (samples.size > 2) {
+      samples.dequeue()
+    }
+    val bytesDelta = samples.toList.reverse match {
+      case latest :: previous :: tail =>
+        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+      // If fewer than 2 samples, assume no change
+      case _ => 0
+    }
+    bytesPerUpdate = math.max(0, bytesDelta)
+    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+  }
+
+  /**
+   * Estimate the current size of the collection in bytes. O(1) time.
+   */
+  def estimateSize(): Long = {
+    assert(samples.nonEmpty)
+    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+    (samples.last.size + extrapolatedDelta).toLong
+  }
+}
+
+private object SizeTracker {
+  case class Sample(size: Long, numUpdates: Long)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index 204330d..de61e1d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,85 +17,24 @@
 
 package org.apache.spark.util.collection
 
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.util.SizeEstimator
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
-
 /**
- * Append-only map that keeps track of its estimated size in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ * An append-only map that keeps track of its estimated size in bytes.
  */
-private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
-
-  /**
-   * Controls the base of the exponential which governs the rate of sampling.
-   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
-   */
-  private val SAMPLE_GROWTH_RATE = 1.1
-
-  /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
-  private val samples = new ArrayBuffer[Sample]()
-
-  /** Total number of insertions and updates into the map since the last resetSamples(). */
-  private var numUpdates: Long = _
-
-  /** The value of 'numUpdates' at which we will take our next sample. */
-  private var nextSampleNum: Long = _
-
-  /** The average number of bytes per update between our last two samples. */
-  private var bytesPerUpdate: Double = _
-
-  resetSamples()
-
-  /** Called after the map grows in size, as this can be a dramatic change for small objects. */
-  def resetSamples() {
-    numUpdates = 1
-    nextSampleNum = 1
-    samples.clear()
-    takeSample()
-  }
+private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
 
   override def update(key: K, value: V): Unit = {
     super.update(key, value)
-    numUpdates += 1
-    if (nextSampleNum == numUpdates) { takeSample() }
+    super.afterUpdate()
   }
 
   override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
     val newValue = super.changeValue(key, updateFunc)
-    numUpdates += 1
-    if (nextSampleNum == numUpdates) { takeSample() }
+    super.afterUpdate()
     newValue
   }
 
-  /** Takes a new sample of the current map's size. */
-  def takeSample() {
-    samples += Sample(SizeEstimator.estimate(this), numUpdates)
-    // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
-    bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
-      case latest :: previous :: tail =>
-        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
-      case _ =>
-        0
-    })
-    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
-  }
-
-  override protected def growTable() {
+  override protected def growTable(): Unit = {
     super.growTable()
     resetSamples()
   }
-
-  /** Estimates the current size of the map in bytes. O(1) time. */
-  def estimateSize(): Long = {
-    assert(samples.nonEmpty)
-    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
-    (samples.last.size + extrapolatedDelta).toLong
-  }
-}
-
-private object SizeTrackingAppendOnlyMap {
-  case class Sample(size: Long, numUpdates: Long)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
new file mode 100644
index 0000000..65a7b4e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An append-only buffer that keeps track of its estimated size in bytes.
+ */
+private[spark] class SizeTrackingVector[T: ClassTag]
+  extends PrimitiveVector[T]
+  with SizeTracker {
+
+  override def +=(value: T): Unit = {
+    super.+=(value)
+    super.afterUpdate()
+  }
+
+  override def resize(newLength: Int): PrimitiveVector[T] = {
+    super.resize(newLength)
+    resetSamples()
+    this
+  }
+
+  /**
+   * Return a trimmed version of the underlying array.
+   */
+  def toArray: Array[T] = {
+    super.iterator.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 7f5d0b0..9c5f394 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
@@ -52,22 +50,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
   }
 
   test("get uncached rdd") {
-    expecting {
-      blockManager.get(RDDBlockId(0, 0)).andReturn(None)
-      blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
-        true).andStubReturn(Seq[(BlockId, BlockStatus)]())
-    }
-
-    whenExecuting(blockManager) {
-      val context = new TaskContext(0, 0, 0)
-      val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-      assert(value.toList === List(1, 2, 3, 4))
-    }
+    // Do not mock this test, because attempting to match Array[Any], which is not covariant,
+    // in blockManager.put is a losing battle. You have been warned.
+    blockManager = sc.env.blockManager
+    cacheManager = sc.env.cacheManager
+    val context = new TaskContext(0, 0, 0)
+    val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+    val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
+    assert(computeValue.toList === List(1, 2, 3, 4))
+    assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
+    assert(getValue.get.data.toList === List(1, 2, 3, 4))
   }
 
   test("get cached rdd") {
     expecting {
-      val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+      val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
       blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
     }