You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/01/18 22:34:16 UTC

spark git commit: [SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager

Repository: spark
Updated Branches:
  refs/heads/master 302bb569f -> b8cb548a4


[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager

This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods.

Author: Josh Rosen <jo...@databricks.com>

Closes #10776 from JoshRosen/SPARK-10985.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8cb548a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8cb548a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8cb548a

Branch: refs/heads/master
Commit: b8cb548a4394221f2b029c84c7f130774da69e3a
Parents: 302bb56
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Jan 18 13:34:12 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Jan 18 13:34:12 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   | 20 ++---
 .../org/apache/spark/memory/MemoryManager.scala | 18 +----
 .../spark/memory/StaticMemoryManager.scala      | 18 ++---
 .../apache/spark/memory/StorageMemoryPool.scala | 30 ++------
 .../spark/memory/UnifiedMemoryManager.scala     | 18 ++---
 .../org/apache/spark/storage/BlockManager.scala | 71 +++++++++---------
 .../org/apache/spark/storage/MemoryStore.scala  | 77 +++++++-------------
 .../org/apache/spark/CacheManagerSuite.scala    |  9 ++-
 .../spark/memory/MemoryManagerSuite.scala       | 23 ++----
 .../spark/memory/StaticMemoryManagerSuite.scala | 24 +++---
 .../apache/spark/memory/TestMemoryManager.scala | 10 +--
 .../memory/UnifiedMemoryManagerSuite.scala      | 30 ++++----
 .../spark/storage/BlockManagerSuite.scala       | 55 ++++++++------
 .../receiver/ReceivedBlockHandler.scala         |  8 +-
 14 files changed, 170 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 36b536e..d92d8b0 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -18,7 +18,6 @@
 package org.apache.spark
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
@@ -68,12 +67,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           logInfo(s"Partition $key not found, computing it")
           val computedValues = rdd.computeOrReadCheckpoint(partition, context)
 
-          // Otherwise, cache the values and keep track of any updates in block statuses
-          val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-          val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
-          val metrics = context.taskMetrics
-          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
-          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
+          // Otherwise, cache the values
+          val cachedValues = putInBlockManager(key, computedValues, storageLevel)
           new InterruptibleIterator(context, cachedValues)
 
         } finally {
@@ -135,7 +130,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
       key: BlockId,
       values: Iterator[T],
       level: StorageLevel,
-      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
       effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
 
     val putLevel = effectiveStorageLevel.getOrElse(level)
@@ -144,8 +138,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
        * This RDD is not to be cached in memory, so we can just pass the computed values as an
        * iterator directly to the BlockManager rather than first fully unrolling it in memory.
        */
-      updatedBlocks ++=
-        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
+      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
       blockManager.get(key) match {
         case Some(v) => v.data.asInstanceOf[Iterator[T]]
         case None =>
@@ -163,11 +156,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
        * single partition. Instead, we unroll the values cautiously, potentially aborting and
        * dropping the partition to disk if applicable.
        */
-      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+      blockManager.memoryStore.unrollSafely(key, values) match {
         case Left(arr) =>
           // We have successfully unrolled the entire partition, so cache it in memory
-          updatedBlocks ++=
-            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
           arr.iterator.asInstanceOf[Iterator[T]]
         case Right(it) =>
           // There is not enough space to cache this partition in memory
@@ -176,7 +168,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
             logWarning(s"Persisting partition $key to disk instead.")
             val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
               useOffHeap = false, deserialized = false, putLevel.replication)
-            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+            putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
           } else {
             returnValues
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 33f8b9f..b5adbd8 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -19,10 +19,8 @@ package org.apache.spark.memory
 
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable
-
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
+import org.apache.spark.storage.{BlockId, MemoryStore}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.memory.MemoryAllocator
 
@@ -67,17 +65,11 @@ private[spark] abstract class MemoryManager(
     storageMemoryPool.setMemoryStore(store)
   }
 
-  // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
-
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
    * @return whether all N bytes were successfully granted.
    */
-  def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+  def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
 
   /**
    * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -85,14 +77,10 @@ private[spark] abstract class MemoryManager(
    * This extra method allows subclasses to differentiate behavior between acquiring storage
    * memory and acquiring unroll memory. For instance, the memory management model in Spark
    * 1.5 and before places a limit on the amount of space that can be freed from unrolling.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
    *
    * @return whether all N bytes were successfully granted.
    */
-  def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+  def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
 
   /**
    * Try to acquire up to `numBytes` of execution memory for the current task and return the

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 3554b55..f9f8f82 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.memory
 
-import scala.collection.mutable
-
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.storage.BlockId
 
 /**
  * A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
@@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager(
     (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
   }
 
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
     if (numBytes > maxStorageMemory) {
       // Fail fast if the block simply won't fit
       logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
         s"memory limit ($maxStorageMemory bytes)")
       false
     } else {
-      storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+      storageMemoryPool.acquireMemory(blockId, numBytes)
     }
   }
 
-  override def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
     val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
     val freeMemory = storageMemoryPool.memoryFree
     // When unrolling, we will use all of the existing free memory, and, if necessary,
@@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager(
     val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
     // Keep it within the range 0 <= X <= maxNumBytesToFree
     val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
-    storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+    storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
   }
 
   private[memory]

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 4036484..6a88966 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -19,11 +19,8 @@ package org.apache.spark.memory
 
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, MemoryStore}
 
 /**
  * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
    * @return whether all N bytes were successfully granted.
    */
-  def acquireMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+  def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
     val numBytesToFree = math.max(0, numBytes - memoryFree)
-    acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+    acquireMemory(blockId, numBytes, numBytesToFree)
   }
 
   /**
@@ -80,19 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
   def acquireMemory(
       blockId: BlockId,
       numBytesToAcquire: Long,
-      numBytesToFree: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+      numBytesToFree: Long): Boolean = lock.synchronized {
     assert(numBytesToAcquire >= 0)
     assert(numBytesToFree >= 0)
     assert(memoryUsed <= poolSize)
     if (numBytesToFree > 0) {
-      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
-      // Register evicted blocks, if any, with the active task metrics
-      Option(TaskContext.get()).foreach { tc =>
-        val metrics = tc.taskMetrics()
-        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
-        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
-      }
+      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
     }
     // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
     // back into this StorageMemoryPool in order to free memory. Therefore, these variables
@@ -129,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
-      val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-      memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
-      val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
+      val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
       // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
       // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
       decrementPoolSize(spaceFreedByEviction)

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 57a24ac..a3321e3 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.memory
 
-import scala.collection.mutable
-
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.storage.BlockId
 
 /**
  * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
@@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
     }
   }
 
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
     assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
     assert(numBytes >= 0)
     if (numBytes > maxStorageMemory) {
@@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] (
       onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
       storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
     }
-    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+    storageMemoryPool.acquireMemory(blockId, numBytes)
   }
 
-  override def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
-    acquireStorageMemory(blockId, numBytes, evictedBlocks)
+  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
+    acquireStorageMemory(blockId, numBytes)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e49d79b..e0a8e88 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -612,12 +612,16 @@ private[spark] class BlockManager(
     None
   }
 
+  /**
+   * @return true if the block was stored or false if the block was already stored or an
+   *         error occurred.
+   */
   def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
     doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
   }
@@ -641,28 +645,32 @@ private[spark] class BlockManager(
 
   /**
    * Put a new block of values to the block manager.
-   * Return a list of blocks updated as a result of this put.
+   *
+   * @return true if the block was stored or false if the block was already stored or an
+   *         error occurred.
    */
   def putArray(
       blockId: BlockId,
       values: Array[Any],
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
     doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
   }
 
   /**
    * Put a new block of serialized bytes to the block manager.
-   * Return a list of blocks updated as a result of this put.
+   *
+   * @return true if the block was stored or false if the block was already stored or an
+   *         error occurred.
    */
   def putBytes(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(bytes != null, "Bytes is null")
     doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
   }
@@ -674,14 +682,16 @@ private[spark] class BlockManager(
    * The effective storage level refers to the level according to which the block will actually be
    * handled. This allows the caller to specify an alternate behavior of doPut while preserving
    * the original level specified by the user.
+   *
+   * @return true if the block was stored or false if the block was already stored or an
+   *         error occurred.
    */
   private def doPut(
       blockId: BlockId,
       data: BlockValues,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None)
-    : Seq[(BlockId, BlockStatus)] = {
+      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -689,9 +699,6 @@ private[spark] class BlockManager(
       require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
     }
 
-    // Return value
-    val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-
     /* Remember the block's storage level so that we can correctly drop it to disk if it needs
      * to be dropped right after it got put into memory. Note, however, that other threads will
      * not be able to get() this block until we call markReady on its BlockInfo. */
@@ -702,7 +709,7 @@ private[spark] class BlockManager(
       if (oldBlockOpt.isDefined) {
         if (oldBlockOpt.get.waitForReady()) {
           logWarning(s"Block $blockId already exists on this machine; not re-adding it")
-          return updatedBlocks
+          return false
         }
         // TODO: So the block info exists - but previous attempt to load it (?) failed.
         // What do we do now ? Retry on it ?
@@ -743,11 +750,12 @@ private[spark] class BlockManager(
       case _ => null
     }
 
+    var marked = false
+
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
-      var marked = false
       try {
         // returnValues - Whether to return the values put
         // blockStore - The type of storage to put these values into
@@ -783,11 +791,6 @@ private[spark] class BlockManager(
           case _ =>
         }
 
-        // Keep track of which blocks are dropped from memory
-        if (putLevel.useMemory) {
-          result.droppedBlocks.foreach { updatedBlocks += _ }
-        }
-
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
         if (putBlockStatus.storageLevel != StorageLevel.NONE) {
           // Now that the block is in either the memory, externalBlockStore, or disk store,
@@ -797,7 +800,11 @@ private[spark] class BlockManager(
           if (tellMaster) {
             reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
           }
-          updatedBlocks += ((blockId, putBlockStatus))
+          Option(TaskContext.get()).foreach { taskContext =>
+            val metrics = taskContext.taskMetrics()
+            val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+            metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, putBlockStatus)))
+          }
         }
       } finally {
         // If we failed in putting the block to memory/disk, notify other possible readers
@@ -847,7 +854,7 @@ private[spark] class BlockManager(
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
-    updatedBlocks
+    marked
   }
 
   /**
@@ -967,32 +974,27 @@ private[spark] class BlockManager(
 
   /**
    * Write a block consisting of a single object.
+   *
+   * @return true if the block was stored or false if the block was already stored or an
+   *         error occurred.
    */
   def putSingle(
       blockId: BlockId,
       value: Any,
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true): Boolean = {
     putIterator(blockId, Iterator(value), level, tellMaster)
   }
 
-  def dropFromMemory(
-      blockId: BlockId,
-      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
-    dropFromMemory(blockId, () => data)
-  }
-
   /**
    * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
    * store reaches its limit and needs to free up space.
    *
    * If `data` is not put on disk, it won't be created.
-   *
-   * Return the block status if the given block has been updated, else None.
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+      data: () => Either[Array[Any], ByteBuffer]): Unit = {
 
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId)
@@ -1005,10 +1007,10 @@ private[spark] class BlockManager(
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
           logWarning(s"Block $blockId was marked as failure. Nothing to drop")
-          return None
+          return
         } else if (blockInfo.asScala.get(blockId).isEmpty) {
           logWarning(s"Block $blockId was already dropped.")
-          return None
+          return
         }
         var blockIsUpdated = false
         val level = info.level
@@ -1044,11 +1046,14 @@ private[spark] class BlockManager(
           blockInfo.remove(blockId)
         }
         if (blockIsUpdated) {
-          return Some(status)
+          Option(TaskContext.get()).foreach { taskContext =>
+            val metrics = taskContext.taskMetrics()
+            val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+            metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
+          }
         }
       }
     }
-    None
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index bdab8c2..76aaa78 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -95,9 +95,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       val values = blockManager.dataDeserialize(blockId, bytes)
       putIterator(blockId, values, level, returnValues = true)
     } else {
-      val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
-      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
+      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
 
@@ -110,8 +109,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
   def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
     // Work on a duplicate - since the original input might be used elsewhere.
     lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks)
+    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
     val data =
       if (putSuccess) {
         assert(bytes.limit == size)
@@ -119,7 +117,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       } else {
         null
       }
-    PutResult(size, data, droppedBlocks)
+    PutResult(size, data)
   }
 
   override def putArray(
@@ -127,15 +125,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     if (level.deserialized) {
       val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
-      PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
+      tryToPut(blockId, values, sizeEstimate, deserialized = true)
+      PutResult(sizeEstimate, Left(values.iterator))
     } else {
       val bytes = blockManager.dataSerialize(blockId, values.iterator)
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
-      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
+      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
 
@@ -165,22 +162,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       level: StorageLevel,
       returnValues: Boolean,
       allowPersistToDisk: Boolean): PutResult = {
-    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-    val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+    val unrolledValues = unrollSafely(blockId, values)
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
         val res = putArray(blockId, arrayValues, level, returnValues)
-        droppedBlocks ++= res.droppedBlocks
-        PutResult(res.size, res.data, droppedBlocks)
+        PutResult(res.size, res.data)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
         if (level.useDisk && allowPersistToDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
           val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
-          PutResult(res.size, res.data, droppedBlocks)
+          PutResult(res.size, res.data)
         } else {
-          PutResult(0, Left(iteratorValues), droppedBlocks)
+          PutResult(0, Left(iteratorValues))
         }
     }
   }
@@ -246,11 +241,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    * This method returns either an array with the contents of the entire block or an iterator
    * containing the values of the block (if the array would have exceeded available memory).
    */
-  def unrollSafely(
-      blockId: BlockId,
-      values: Iterator[Any],
-      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
-    : Either[Array[Any], Iterator[Any]] = {
+  def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
 
     // Number of elements unrolled so far
     var elementsUnrolled = 0
@@ -270,7 +261,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     var vector = new SizeTrackingVector[Any]
 
     // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks)
+    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
 
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
@@ -286,8 +277,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
           val currentSize = vector.estimateSize()
           if (currentSize >= memoryThreshold) {
             val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
-            keepUnrolling = reserveUnrollMemoryForThisTask(
-              blockId, amountToRequest, droppedBlocks)
+            keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
             // New threshold is currentSize * memoryGrowthFactor
             memoryThreshold += amountToRequest
           }
@@ -337,9 +327,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       blockId: BlockId,
       value: Any,
       size: Long,
-      deserialized: Boolean,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
+      deserialized: Boolean): Boolean = {
+    tryToPut(blockId, () => value, size, deserialized)
   }
 
   /**
@@ -355,16 +344,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    * blocks to free memory for one block, another thread may use up the freed space for
    * another block.
    *
-   * All blocks evicted in the process, if any, will be added to `droppedBlocks`.
-   *
    * @return whether put was successful.
    */
   private def tryToPut(
       blockId: BlockId,
       value: () => Any,
       size: Long,
-      deserialized: Boolean,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+      deserialized: Boolean): Boolean = {
 
     /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
      * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
@@ -380,7 +366,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       // happen atomically. This relies on the assumption that all memory acquisitions are
       // synchronized on the same lock.
       releasePendingUnrollMemoryForThisTask()
-      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
+      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
       if (enoughMemory) {
         // We acquired enough memory for the block, so go ahead and put it
         val entry = new MemoryEntry(value(), size, deserialized)
@@ -398,8 +384,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
         } else {
           Right(value().asInstanceOf[ByteBuffer].duplicate())
         }
-        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
-        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
+        blockManager.dropFromMemory(blockId, () => data)
       }
       enoughMemory
     }
@@ -413,13 +398,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     *
     * @param blockId the ID of the block we are freeing space for, if any
     * @param space the size of this block
-    * @param droppedBlocks a holder for blocks evicted in the process
-    * @return whether the requested free space is freed.
+    * @return the amount of memory (in bytes) freed by eviction
     */
-  private[spark] def evictBlocksToFreeSpace(
-      blockId: Option[BlockId],
-      space: Long,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
     assert(space > 0)
     memoryManager.synchronized {
       var freedMemory = 0L
@@ -453,17 +434,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
-            val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
-            droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
+            blockManager.dropFromMemory(blockId, () => data)
           }
         }
-        true
+        freedMemory
       } else {
         blockId.foreach { id =>
           logInfo(s"Will not store $id as it would require dropping another block " +
             "from the same RDD")
         }
-        false
+        0L
       }
     }
   }
@@ -481,12 +461,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    * Reserve memory for unrolling the given block for this task.
    * @return whether the request is granted.
    */
-  def reserveUnrollMemoryForThisTask(
-      blockId: BlockId,
-      memory: Long,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+  def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
     memoryManager.synchronized {
-      val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
+      val success = memoryManager.acquireUnrollMemory(blockId, memory)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
         unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 30aa94c..3865c20 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -85,7 +85,12 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
   test("verify task metrics updated correctly") {
     cacheManager = sc.env.cacheManager
     val context = TaskContext.empty()
-    cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
-    assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+    try {
+      TaskContext.setTaskContext(context)
+      cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
+      assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+    } finally {
+      TaskContext.unset()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 3b23687..d9764c7 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -70,8 +70,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
    */
   protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
     val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
-    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
-      .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
+    when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
     mm.setMemoryStore(ms)
     ms
   }
@@ -89,9 +88,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
     * records the number of bytes this is called with. This variable is expected to be cleared
     * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
     */
-  private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
-    new Answer[Boolean] {
-      override def answer(invocation: InvocationOnMock): Boolean = {
+  private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = {
+    new Answer[Long] {
+      override def answer(invocation: InvocationOnMock): Long = {
         val args = invocation.getArguments
         val numBytesToFree = args(1).asInstanceOf[Long]
         assert(numBytesToFree > 0)
@@ -101,20 +100,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
         if (numBytesToFree <= mm.storageMemoryUsed) {
           // We can evict enough blocks to fulfill the request for space
           mm.releaseStorageMemory(numBytesToFree)
-          args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
+          evictedBlocks.append(
             (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
-          // We need to add this call so that that the suite-level `evictedBlocks` is updated when
-          // execution evicts storage; in that case, args.last will not be equal to evictedBlocks
-          // because it will be a temporary buffer created inside of the MemoryManager rather than
-          // being passed in by the test code.
-          if (!(evictedBlocks eq args.last)) {
-            evictedBlocks.append(
-              (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
-          }
-          true
+          numBytesToFree
         } else {
           // No blocks were evicted because eviction would not free enough space.
-          false
+          0L
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 68cf26f..eee78d3 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -81,22 +81,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val dummyBlock = TestBlockId("you can see the world you brought to live")
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 10L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
 
-    assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
+    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem))
     assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceCalled(ms, 1L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
@@ -107,12 +107,12 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     mm.releaseStorageMemory(800L)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
@@ -134,7 +134,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 200L)
     // Only storage memory should increase
-    assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 50L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 200L)
@@ -152,21 +152,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("lonely water")
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+    assert(mm.acquireUnrollMemory(dummyBlock, 100L))
     when(ms.currentUnrollMemory).thenReturn(100L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 100L)
     mm.releaseUnrollMemory(40L)
     assert(mm.storageMemoryUsed === 60L)
     when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 800L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 860L)
     // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
     // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes.
     // Requesting 240 more bytes of unroll memory will leave our total unroll memory at
     // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted.
-    assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks))
+    assert(mm.acquireUnrollMemory(dummyBlock, 240L))
     assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
     when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
     assert(mm.storageMemoryUsed === 1000L)
@@ -174,7 +174,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     // We already have 300 bytes of unroll memory, so requesting 150 more will leave us
     // above the 400-byte limit. Since there is not enough free memory, this request will
     // fail even after evicting as much as we can (400 - 300 = 100 bytes).
-    assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks))
+    assert(!mm.acquireUnrollMemory(dummyBlock, 150L))
     assertEvictBlocksToFreeSpaceCalled(ms, 100L)
     assert(mm.storageMemoryUsed === 900L)
     // Release beyond what was acquired

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 4a1e49b..e5cb9d3 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -41,14 +41,8 @@ class TestMemoryManager(conf: SparkConf)
       grant
     }
   }
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
-  override def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true
+  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true
   override def releaseStorageMemory(numBytes: Long): Unit = {}
   override private[memory] def releaseExecutionMemory(
       numBytes: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 6cc4859..0c4359c 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -74,24 +74,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val maxMemory = 1000L
     val (mm, ms) = makeThings(maxMemory)
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 10L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
 
-    assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+    assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, maxMemory))
     assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceCalled(ms, 1L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
@@ -102,12 +102,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     mm.releaseStorageMemory(800L)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
@@ -120,7 +120,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
     // Acquire enough storage memory to exceed the storage region
-    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 750L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 750L)
@@ -140,7 +140,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     require(mm.executionMemoryUsed === 300L)
     require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
     // Acquire some storage memory again, but this time keep it within the storage region
-    assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 400L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 400L)
     assert(mm.executionMemoryUsed === 300L)
@@ -157,7 +157,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
     // Acquire enough storage memory to exceed the storage region size
-    assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 700L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 700L)
@@ -182,11 +182,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(mm.storageMemoryUsed === 0L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should not be able to evict execution
-    assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+    assert(!mm.acquireStorageMemory(dummyBlock, 250L))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
     // Do not attempt to evict blocks, since evicting will not free enough memory:
@@ -199,11 +199,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(mm.storageMemoryUsed === 0L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should still not be able to evict execution
-    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 750L))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
     assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free
-    assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+    assert(!mm.acquireStorageMemory(dummyBlock, 850L))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
     // Do not attempt to evict blocks, since evicting will not free enough memory:
@@ -243,7 +243,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
     assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
     // Fill up all of the remaining memory with storage.
-    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 800L))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 800)
     assert(mm.executionMemoryUsed === 200)

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0f31561..6e6cf63 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -184,8 +184,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -425,8 +425,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -847,23 +847,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))
 
+    def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
+      val context = TaskContext.empty()
+      try {
+        TaskContext.setTaskContext(context)
+        task
+      } finally {
+        TaskContext.unset()
+      }
+      context.taskMetrics.updatedBlocks.getOrElse(Seq.empty)
+    }
+
     // 1 updated block (i.e. list1)
-    val updatedBlocks1 =
+    val updatedBlocks1 = getUpdatedBlocks {
       store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    }
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
     assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 1 updated block (i.e. list2)
-    val updatedBlocks2 =
+    val updatedBlocks2 = getUpdatedBlocks {
       store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    }
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
     assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
-    val updatedBlocks3 =
+    val updatedBlocks3 = getUpdatedBlocks {
       store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    }
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
       id match {
@@ -875,8 +889,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
-    val updatedBlocks4 =
+    val updatedBlocks4 = getUpdatedBlocks {
       store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    }
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
       id match {
@@ -889,8 +904,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
-    val updatedBlocks5 =
+    val updatedBlocks5 = getUpdatedBlocks {
       store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    }
     assert(updatedBlocks5.size === 0)
 
     // memory store contains only list3 and list4
@@ -1005,8 +1021,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
-      memoryStore.reserveUnrollMemoryForThisTask(
-        TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)])
+      memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
     }
 
     // Reserve
@@ -1062,11 +1077,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
     val memoryStore = store.memoryStore
-    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with all the space in the world. This should succeed and return an array.
-    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.releasePendingUnrollMemoryForThisTask()
@@ -1074,24 +1088,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
     store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
     store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    assert(droppedBlocks.size === 1)
-    assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
-    droppedBlocks.clear()
+    assert(memoryStore.contains("someBlock2"))
+    assert(!memoryStore.contains("someBlock1"))
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
     store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
-    assert(droppedBlocks.size === 1)
-    assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
-    droppedBlocks.clear()
+    assert(!memoryStore.contains("someBlock2"))
   }
 
   test("safely unroll blocks through putIterator") {
@@ -1238,7 +1249,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     })
     assert(result.size === 13000)
     assert(result.data === null)
-    assert(result.droppedBlocks === Nil)
   }
 
   test("put a small ByteBuffer to MemoryStore") {
@@ -1252,6 +1262,5 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     })
     assert(result.size === 10000)
     assert(result.data === Right(bytes))
-    assert(result.droppedBlocks === Nil)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b8cb548a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index faa5aca..e22e320 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
 
     var numRecords: Option[Long] = None
 
-    val putResult: Seq[(BlockId, BlockStatus)] = block match {
+    val putSucceeded: Boolean = block match {
       case ArrayBufferBlock(arrayBuffer) =>
         numRecords = Some(arrayBuffer.size.toLong)
         blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
@@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
         throw new SparkException(
           s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
     }
-    if (!putResult.map { _._1 }.contains(blockId)) {
+    if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
     }
@@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
-      val putResult =
+      val putSucceeded =
         blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
-      if (!putResult.map { _._1 }.contains(blockId)) {
+      if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org