You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/14 22:26:43 UTC

spark git commit: [SPARK-10907][SPARK-6157] Remove pendingUnrollMemory from MemoryStore

Repository: spark
Updated Branches:
  refs/heads/master a48296f4f -> 38529d8f2


[SPARK-10907][SPARK-6157] Remove pendingUnrollMemory from MemoryStore

This patch refactors the MemoryStore to remove the concept of `pendingUnrollMemory`. It also fixes fixes SPARK-6157: "Unrolling with MEMORY_AND_DISK should always release memory".

Key changes:

- Inline `MemoryStore.tryToPut` at its three call sites in the `MemoryStore`.
- Inline `Memory.unrollSafely` at its only call site (in `MemoryStore.putIterator`).
- Inline `MemoryManager.acquireStorageMemory` at its call sites.
- Simplify the code as a result of this inlining (some parameters have fixed values after inlining, so lots of branches can be removed).
- Remove the `pendingUnrollMemory` map by returning the amount of unrollMemory allocated when returning an iterator after a failed `putIterator` call.
- Change `putIterator` to return an instance of `PartiallyUnrolledIterator`, a special iterator subclass which will automatically free the unroll memory of its partially-unrolled elements when the iterator is consumed. To handle cases where the iterator is not consumed (e.g. when a MEMORY_ONLY put fails), `PartiallyUnrolledIterator` exposes a `close()` method which may be called to discard the unrolled values and free their memory.

Author: Josh Rosen <jo...@databricks.com>

Closes #11613 from JoshRosen/cleanup-unroll-memory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38529d8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38529d8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38529d8f

Branch: refs/heads/master
Commit: 38529d8f2350feb1f143aab0be336050c0f887f2
Parents: a48296f
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Mar 14 14:26:39 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Mar 14 14:26:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala |  24 +-
 .../spark/storage/memory/MemoryStore.scala      | 361 +++++++++----------
 .../spark/storage/BlockManagerSuite.scala       |  77 ++--
 3 files changed, 224 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38529d8f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b503410..d21df4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,8 +669,15 @@ private[spark] class BlockManager(
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(values != null, "Values is null")
-    // If doPut() didn't hand work back to us, then block already existed or was successfully stored
-    doPutIterator(blockId, () => values, level, tellMaster).isEmpty
+    doPutIterator(blockId, () => values, level, tellMaster) match {
+      case None =>
+        true
+      case Some(iter) =>
+        // Caller doesn't care about the iterator values, so we can close the iterator here
+        // to free resources earlier
+        iter.close()
+        false
+    }
   }
 
   /**
@@ -745,7 +752,14 @@ private[spark] class BlockManager(
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
           val values = dataDeserialize(blockId, bytes.duplicate())
-          memoryStore.putIterator(blockId, values, level).isRight
+          memoryStore.putIterator(blockId, values, level) match {
+            case Right(_) => true
+            case Left(iter) =>
+              // If putting deserialized values in memory failed, we will put the bytes directly to
+              // disk, so we don't need this iterator and can close it to free resources earlier.
+              iter.close()
+              false
+          }
         } else {
           memoryStore.putBytes(blockId, size, () => bytes)
         }
@@ -857,10 +871,10 @@ private[spark] class BlockManager(
       iterator: () => Iterator[Any],
       level: StorageLevel,
       tellMaster: Boolean = true,
-      keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
     doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
       val startTimeMs = System.currentTimeMillis
-      var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None
       // Size of the block in bytes
       var size = 0L
       if (level.useMemory) {

http://git-wip-us.apache.org/repos/asf/spark/blob/38529d8f/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index a80b235..02d44dc 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.{Logging, SparkConf, TaskContext}
 import org.apache.spark.memory.MemoryManager
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
-import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 
 private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
@@ -49,14 +49,6 @@ private[spark] class MemoryStore(
   // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
   // All accesses of this map are assumed to have manually synchronized on `memoryManager`
   private val unrollMemoryMap = mutable.HashMap[Long, Long]()
-  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
-  // Pending unroll memory refers to the intermediate memory occupied by a task
-  // after the unroll but before the actual putting of the block in the cache.
-  // This chunk of memory is expected to be released *as soon as* we finish
-  // caching the corresponding block as opposed to until after the task finishes.
-  // This is only used if a block is successfully unrolled in its entirety in
-  // memory (SPARK-4777).
-  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
 
   // Initial memory to request before unrolling any block
   private val unrollMemoryThreshold: Long =
@@ -100,48 +92,151 @@ private[spark] class MemoryStore(
    */
   def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
-    if (putSuccess) {
+    if (memoryManager.acquireStorageMemory(blockId, size)) {
+      // We acquired enough memory for the block, so go ahead and put it
+      // Work on a duplicate - since the original input might be used elsewhere.
+      val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
       assert(bytes.limit == size)
+      val entry = new MemoryEntry(bytes, size, deserialized = false)
+      entries.synchronized {
+        entries.put(blockId, entry)
+      }
+      logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
+        blockId, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+      true
+    } else {
+      false
     }
-    putSuccess
   }
 
   /**
    * Attempt to put the given block in memory store.
    *
-   * @return the estimated size of the stored data if the put() succeeded, or an iterator
-   *         in case the put() failed (the returned iterator lets callers fall back to the disk
-   *         store if desired).
+   * It's possible that the iterator is too large to materialize and store in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
+   * whether there is enough free memory. If the block is successfully materialized, then the
+   * temporary unroll memory used during the materialization is "transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the block.
+   *
+   * @return in case of success, the estimated the estimated size of the stored data. In case of
+   *         failure, return an iterator containing the values of the block. The returned iterator
+   *         will be backed by the combination of the partially-unrolled block and the remaining
+   *         elements of the original input iterator. The caller must either fully consume this
+   *         iterator or call `close()` on it in order to free the storage memory consumed by the
+   *         partially-unrolled block.
    */
   private[storage] def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long] = {
+      level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    val unrolledValues = unrollSafely(blockId, values)
-    unrolledValues match {
-      case Left(arrayValues) =>
-        // Values are fully unrolled in memory, so store them as an array
-        if (level.deserialized) {
-          val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
-          if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
-            Right(sizeEstimate)
-          } else {
-            Left(arrayValues.toIterator)
+
+    // Number of elements unrolled so far
+    var elementsUnrolled = 0
+    // Whether there is still enough memory for us to continue unrolling this block
+    var keepUnrolling = true
+    // Initial per-task memory to request for unrolling blocks (bytes).
+    val initialMemoryThreshold = unrollMemoryThreshold
+    // How often to check whether we need to request more memory
+    val memoryCheckPeriod = 16
+    // Memory currently reserved by this task for this particular unrolling operation
+    var memoryThreshold = initialMemoryThreshold
+    // Memory to request as a multiple of current vector size
+    val memoryGrowthFactor = 1.5
+    // Keep track of unroll memory used by this particular block / putIterator() operation
+    var unrollMemoryUsedByThisBlock = 0L
+    // Underlying vector for unrolling the block
+    var vector = new SizeTrackingVector[Any]
+
+    // Request enough memory to begin unrolling
+    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+
+    if (!keepUnrolling) {
+      logWarning(s"Failed to reserve initial memory threshold of " +
+        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+    } else {
+      unrollMemoryUsedByThisBlock += initialMemoryThreshold
+    }
+
+    // Unroll this block safely, checking whether we have exceeded our threshold periodically
+    while (values.hasNext && keepUnrolling) {
+      vector += values.next()
+      if (elementsUnrolled % memoryCheckPeriod == 0) {
+        // If our vector's size has exceeded the threshold, request more memory
+        val currentSize = vector.estimateSize()
+        if (currentSize >= memoryThreshold) {
+          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
+          keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+          if (keepUnrolling) {
+            unrollMemoryUsedByThisBlock += amountToRequest
           }
-        } else {
-          val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-          if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
-            Right(bytes.limit())
-          } else {
-            Left(arrayValues.toIterator)
+          // New threshold is currentSize * memoryGrowthFactor
+          memoryThreshold += amountToRequest
+        }
+      }
+      elementsUnrolled += 1
+    }
+
+    if (keepUnrolling) {
+      // We successfully unrolled the entirety of this block
+      val arrayValues = vector.toArray
+      vector = null
+      val entry = if (level.deserialized) {
+        new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true)
+      } else {
+        val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+        new MemoryEntry(bytes, bytes.limit, deserialized = false)
+      }
+      val size = entry.size
+      def transferUnrollToStorage(amount: Long): Unit = {
+        // Synchronize so that transfer is atomic
+        memoryManager.synchronized {
+          releaseUnrollMemoryForThisTask(amount)
+          val success = memoryManager.acquireStorageMemory(blockId, amount)
+          assert(success, "transferring unroll memory to storage memory failed")
+        }
+      }
+      // Acquire storage memory if necessary to store this block in memory.
+      val enoughStorageMemory = {
+        if (unrollMemoryUsedByThisBlock <= size) {
+          val acquiredExtra =
+            memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock)
+          if (acquiredExtra) {
+            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
           }
+          acquiredExtra
+        } else { // unrollMemoryUsedByThisBlock > size
+          // If this task attempt already owns more unroll memory than is necessary to store the
+          // block, then release the extra memory that will not be used.
+          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
+          releaseUnrollMemoryForThisTask(excessUnrollMemory)
+          transferUnrollToStorage(size)
+          true
         }
-      case Right(iteratorValues) =>
-        Left(iteratorValues)
+      }
+      if (enoughStorageMemory) {
+        entries.synchronized {
+          entries.put(blockId, entry)
+        }
+        val bytesOrValues = if (level.deserialized) "values" else "bytes"
+        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+          blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+        Right(size)
+      } else {
+        assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask,
+          "released too much unroll memory")
+        Left(new PartiallyUnrolledIterator(
+          this,
+          unrollMemoryUsedByThisBlock,
+          unrolled = arrayValues.toIterator,
+          rest = Iterator.empty))
+      }
+    } else {
+      // We ran out of space while unrolling the values for this block
+      logUnrollFailureMessage(blockId, vector.estimateSize())
+      Left(new PartiallyUnrolledIterator(
+        this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))
     }
   }
 
@@ -188,103 +283,11 @@ private[spark] class MemoryStore(
       entries.clear()
     }
     unrollMemoryMap.clear()
-    pendingUnrollMemoryMap.clear()
     memoryManager.releaseAllStorageMemory()
     logInfo("MemoryStore cleared")
   }
 
   /**
-   * Unroll the given block in memory safely.
-   *
-   * The safety of this operation refers to avoiding potential OOM exceptions caused by
-   * unrolling the entirety of the block in memory at once. This is achieved by periodically
-   * checking whether the memory restrictions for unrolling blocks are still satisfied,
-   * stopping immediately if not. This check is a safeguard against the scenario in which
-   * there is not enough free memory to accommodate the entirety of a single block.
-   *
-   * This method returns either an array with the contents of the entire block or an iterator
-   * containing the values of the block (if the array would have exceeded available memory).
-   */
-  def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
-
-    // Number of elements unrolled so far
-    var elementsUnrolled = 0
-    // Whether there is still enough memory for us to continue unrolling this block
-    var keepUnrolling = true
-    // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
-    val initialMemoryThreshold = unrollMemoryThreshold
-    // How often to check whether we need to request more memory
-    val memoryCheckPeriod = 16
-    // Memory currently reserved by this task for this particular unrolling operation
-    var memoryThreshold = initialMemoryThreshold
-    // Memory to request as a multiple of current vector size
-    val memoryGrowthFactor = 1.5
-    // Keep track of pending unroll memory reserved by this method.
-    var pendingMemoryReserved = 0L
-    // Underlying vector for unrolling the block
-    var vector = new SizeTrackingVector[Any]
-
-    // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
-
-    if (!keepUnrolling) {
-      logWarning(s"Failed to reserve initial memory threshold of " +
-        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
-    } else {
-      pendingMemoryReserved += initialMemoryThreshold
-    }
-
-    // Unroll this block safely, checking whether we have exceeded our threshold periodically
-    try {
-      while (values.hasNext && keepUnrolling) {
-        vector += values.next()
-        if (elementsUnrolled % memoryCheckPeriod == 0) {
-          // If our vector's size has exceeded the threshold, request more memory
-          val currentSize = vector.estimateSize()
-          if (currentSize >= memoryThreshold) {
-            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
-            keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
-            if (keepUnrolling) {
-              pendingMemoryReserved += amountToRequest
-            }
-            // New threshold is currentSize * memoryGrowthFactor
-            memoryThreshold += amountToRequest
-          }
-        }
-        elementsUnrolled += 1
-      }
-
-      if (keepUnrolling) {
-        // We successfully unrolled the entirety of this block
-        Left(vector.toArray)
-      } else {
-        // We ran out of space while unrolling the values for this block
-        logUnrollFailureMessage(blockId, vector.estimateSize())
-        Right(vector.iterator ++ values)
-      }
-
-    } finally {
-      // If we return an array, the values returned here will be cached in `tryToPut` later.
-      // In this case, we should release the memory only after we cache the block there.
-      if (keepUnrolling) {
-        val taskAttemptId = currentTaskAttemptId()
-        memoryManager.synchronized {
-          // Since we continue to hold onto the array until we actually cache it, we cannot
-          // release the unroll memory yet. Instead, we transfer it to pending unroll memory
-          // so `tryToPut` can further transfer it to normal storage memory later.
-          // TODO: we can probably express this without pending unroll memory (SPARK-10907)
-          unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
-          pendingUnrollMemoryMap(taskAttemptId) =
-            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
-        }
-      } else {
-        // Otherwise, if we return an iterator, we can only release the unroll memory when
-        // the task finishes since we don't know when the iterator will be consumed.
-      }
-    }
-  }
-
-  /**
    * Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
    */
   private def getRddId(blockId: BlockId): Option[Int] = {
@@ -292,48 +295,6 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Try to put in a set of values, if we can free up enough space. The value should either be
-   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
-   * must also be passed by the caller.
-   *
-   * @return whether put was successful.
-   */
-  private def tryToPut(
-      blockId: BlockId,
-      value: () => Any,
-      size: Long,
-      deserialized: Boolean): Boolean = {
-    val acquiredEnoughStorageMemory = {
-      // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
-      // task.
-      memoryManager.synchronized {
-        // Note: if we have previously unrolled this block successfully, then pending unroll
-        // memory should be non-zero. This is the amount that we already reserved during the
-        // unrolling process. In this case, we can just reuse this space to cache our block.
-        // The synchronization on `memoryManager` here guarantees that the release and acquire
-        // happen atomically. This relies on the assumption that all memory acquisitions are
-        // synchronized on the same lock.
-        releasePendingUnrollMemoryForThisTask()
-        memoryManager.acquireStorageMemory(blockId, size)
-      }
-    }
-
-    if (acquiredEnoughStorageMemory) {
-      // We acquired enough memory for the block, so go ahead and put it
-      val entry = new MemoryEntry(value(), size, deserialized)
-      entries.synchronized {
-        entries.put(blockId, entry)
-      }
-      val valuesOrBytes = if (deserialized) "values" else "bytes"
-      logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
-        blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
-      true
-    } else {
-      false
-    }
-  }
-
-  /**
     * Try to evict blocks to free up a given amount of space to store a particular block.
     * Can fail if either the block is bigger than our memory or it would require replacing
     * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
@@ -457,29 +418,10 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Release pending unroll memory of current unroll successful block used by this task
-   */
-  def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
-    val taskAttemptId = currentTaskAttemptId()
-    memoryManager.synchronized {
-      if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
-        val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
-        if (memoryToRelease > 0) {
-          pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
-            pendingUnrollMemoryMap.remove(taskAttemptId)
-          }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
-        }
-      }
-    }
-  }
-
-  /**
    * Return the amount of memory currently occupied for unrolling blocks across all tasks.
    */
   def currentUnrollMemory: Long = memoryManager.synchronized {
-    unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
+    unrollMemoryMap.values.sum
   }
 
   /**
@@ -520,3 +462,42 @@ private[spark] class MemoryStore(
     logMemoryUsage()
   }
 }
+
+/**
+ * The result of a failed [[MemoryStore.putIterator()]] call.
+ *
+ * @param memoryStore the memoryStore, used for freeing memory.
+ * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param unrolled an iterator for the partially-unrolled values.
+ * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
+ */
+private[storage] class PartiallyUnrolledIterator(
+    memoryStore: MemoryStore,
+    unrollMemory: Long,
+    unrolled: Iterator[Any],
+    rest: Iterator[Any])
+  extends Iterator[Any] {
+
+  private[this] var unrolledIteratorIsConsumed: Boolean = false
+  private[this] var iter: Iterator[Any] = {
+    val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+      unrolledIteratorIsConsumed = true
+      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+    })
+    completionIterator ++ rest
+  }
+
+  override def hasNext: Boolean = iter.hasNext
+  override def next(): Any = iter.next()
+
+  /**
+   * Called to dispose of this iterator and free its memory.
+   */
+  def close(): Unit = {
+    if (!unrolledIteratorIsConsumed) {
+      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+      unrolledIteratorIsConsumed = true
+    }
+    iter = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/38529d8f/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index dc4be14..2e0c059 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1065,28 +1065,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
   }
 
-  /**
-   * Verify the result of MemoryStore#unrollSafely is as expected.
-   */
-  private def verifyUnroll(
-      expected: Iterator[Any],
-      result: Either[Array[Any], Iterator[Any]],
-      shouldBeArray: Boolean): Unit = {
-    val actual: Iterator[Any] = result match {
-      case Left(arr: Array[Any]) =>
-        assert(shouldBeArray, "expected iterator from unroll!")
-        arr.iterator
-      case Right(it: Iterator[Any]) =>
-        assert(!shouldBeArray, "expected array from unroll!")
-        it
-      case _ =>
-        fail("unroll returned neither an iterator nor an array...")
-    }
-    expected.zip(actual).foreach { case (e, a) =>
-      assert(e === a, "unroll did not return original values!")
-    }
-  }
-
   test("safely unroll blocks") {
     store = makeBlockManager(12000)
     val smallList = List.fill(40)(new Array[Byte](100))
@@ -1094,30 +1072,41 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val memoryStore = store.memoryStore
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    // Unroll with all the space in the world. This should succeed and return an array.
-    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
-    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    // Unroll with all the space in the world. This should succeed.
+    var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    memoryStore.releasePendingUnrollMemoryForThisTask()
+    smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+      assert(e === a, "getValues() did not return original values!")
+    }
+    assert(memoryStore.remove("unroll"))
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
-    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY))
+    assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY))
+    putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     assert(memoryStore.contains("someBlock2"))
     assert(!memoryStore.contains("someBlock1"))
-    memoryStore.releasePendingUnrollMemoryForThisTask()
+    smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+      assert(e === a, "getValues() did not return original values!")
+    }
+    assert(memoryStore.remove("unroll"))
 
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
-    verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+    assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY))
+    putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
     assert(!memoryStore.contains("someBlock2"))
+    assert(putResult.isLeft)
+    bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
+      assert(e === a, "putIterator() did not return original values!")
+    }
+    // The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
   }
 
   test("safely unroll blocks through putIterator") {
@@ -1208,6 +1197,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+    result4.left.get.close()
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
   }
 
   test("multiple unrolls by the same thread") {
@@ -1218,29 +1209,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    // All unroll memory used is released because unrollSafely returned an array
-    memoryStore.putIterator("b1", smallIterator, memOnly)
+    // All unroll memory used is released because putIterator did not return an iterator
+    assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    memoryStore.putIterator("b2", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    // Unroll memory is not released because unrollSafely returned an iterator
+    // Unroll memory is not released because putIterator returned an iterator
     // that still depends on the underlying vector used in the process
-    memoryStore.putIterator("b3", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft)
     val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after the previous unrolls
-    memoryStore.putIterator("b4", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft)
     val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
-    memoryStore.putIterator("b5", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft)
     val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b6", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft)
     val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b7", smallIterator, memOnly)
+    assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft)
     val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org