You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/11 00:08:46 UTC

[1/2] spark git commit: [SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk stores

Repository: spark
Updated Branches:
  refs/heads/master 3d2b6f56e -> 81d48532d


http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
new file mode 100644
index 0000000..a80b235
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage.memory
+
+import java.nio.ByteBuffer
+import java.util.LinkedHashMap
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkConf, TaskContext}
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
+import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.collection.SizeTrackingVector
+
+private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
+
+/**
+ * Stores blocks in memory, either as Arrays of deserialized Java objects or as
+ * serialized ByteBuffers.
+ */
+private[spark] class MemoryStore(
+    conf: SparkConf,
+    blockManager: BlockManager,
+    memoryManager: MemoryManager)
+  extends Logging {
+
+  // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
+  // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
+
+  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+
+  // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
+  // All accesses of this map are assumed to have manually synchronized on `memoryManager`
+  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
+  // Pending unroll memory refers to the intermediate memory occupied by a task
+  // after the unroll but before the actual putting of the block in the cache.
+  // This chunk of memory is expected to be released *as soon as* we finish
+  // caching the corresponding block as opposed to until after the task finishes.
+  // This is only used if a block is successfully unrolled in its entirety in
+  // memory (SPARK-4777).
+  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
+
+  // Initial memory to request before unrolling any block
+  private val unrollMemoryThreshold: Long =
+    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+
+  /** Total amount of memory available for storage, in bytes. */
+  private def maxMemory: Long = memoryManager.maxStorageMemory
+
+  if (maxMemory < unrollMemoryThreshold) {
+    logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
+      s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
+      s"memory. Please configure Spark with more memory.")
+  }
+
+  logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
+
+  /** Total storage memory used including unroll memory, in bytes. */
+  private def memoryUsed: Long = memoryManager.storageMemoryUsed
+
+  /**
+   * Amount of storage memory, in bytes, used for caching blocks.
+   * This does not include memory used for unrolling.
+   */
+  private def blocksMemoryUsed: Long = memoryManager.synchronized {
+    memoryUsed - currentUnrollMemory
+  }
+
+  def getSize(blockId: BlockId): Long = {
+    entries.synchronized {
+      entries.get(blockId).size
+    }
+  }
+
+  /**
+   * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
+   * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
+   *
+   * The caller should guarantee that `size` is correct.
+   *
+   * @return true if the put() succeeded, false otherwise.
+   */
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+    // Work on a duplicate - since the original input might be used elsewhere.
+    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
+    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
+    if (putSuccess) {
+      assert(bytes.limit == size)
+    }
+    putSuccess
+  }
+
+  /**
+   * Attempt to put the given block in memory store.
+   *
+   * @return the estimated size of the stored data if the put() succeeded, or an iterator
+   *         in case the put() failed (the returned iterator lets callers fall back to the disk
+   *         store if desired).
+   */
+  private[storage] def putIterator(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel): Either[Iterator[Any], Long] = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+    val unrolledValues = unrollSafely(blockId, values)
+    unrolledValues match {
+      case Left(arrayValues) =>
+        // Values are fully unrolled in memory, so store them as an array
+        if (level.deserialized) {
+          val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+          if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
+            Right(sizeEstimate)
+          } else {
+            Left(arrayValues.toIterator)
+          }
+        } else {
+          val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+          if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
+            Right(bytes.limit())
+          } else {
+            Left(arrayValues.toIterator)
+          }
+        }
+      case Right(iteratorValues) =>
+        Left(iteratorValues)
+    }
+  }
+
+  def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+    val entry = entries.synchronized {
+      entries.get(blockId)
+    }
+    if (entry == null) {
+      None
+    } else {
+      require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
+      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
+    }
+  }
+
+  def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+    val entry = entries.synchronized {
+      entries.get(blockId)
+    }
+    if (entry == null) {
+      None
+    } else {
+      require(entry.deserialized, "should only call getValues on deserialized blocks")
+      Some(entry.value.asInstanceOf[Array[Any]].iterator)
+    }
+  }
+
+  def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
+    val entry = entries.synchronized {
+      entries.remove(blockId)
+    }
+    if (entry != null) {
+      memoryManager.releaseStorageMemory(entry.size)
+      logDebug(s"Block $blockId of size ${entry.size} dropped " +
+        s"from memory (free ${maxMemory - blocksMemoryUsed})")
+      true
+    } else {
+      false
+    }
+  }
+
+  def clear(): Unit = memoryManager.synchronized {
+    entries.synchronized {
+      entries.clear()
+    }
+    unrollMemoryMap.clear()
+    pendingUnrollMemoryMap.clear()
+    memoryManager.releaseAllStorageMemory()
+    logInfo("MemoryStore cleared")
+  }
+
+  /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are still satisfied,
+   * stopping immediately if not. This check is a safeguard against the scenario in which
+   * there is not enough free memory to accommodate the entirety of a single block.
+   *
+   * This method returns either an array with the contents of the entire block or an iterator
+   * containing the values of the block (if the array would have exceeded available memory).
+   */
+  def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
+
+    // Number of elements unrolled so far
+    var elementsUnrolled = 0
+    // Whether there is still enough memory for us to continue unrolling this block
+    var keepUnrolling = true
+    // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
+    val initialMemoryThreshold = unrollMemoryThreshold
+    // How often to check whether we need to request more memory
+    val memoryCheckPeriod = 16
+    // Memory currently reserved by this task for this particular unrolling operation
+    var memoryThreshold = initialMemoryThreshold
+    // Memory to request as a multiple of current vector size
+    val memoryGrowthFactor = 1.5
+    // Keep track of pending unroll memory reserved by this method.
+    var pendingMemoryReserved = 0L
+    // Underlying vector for unrolling the block
+    var vector = new SizeTrackingVector[Any]
+
+    // Request enough memory to begin unrolling
+    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+
+    if (!keepUnrolling) {
+      logWarning(s"Failed to reserve initial memory threshold of " +
+        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+    } else {
+      pendingMemoryReserved += initialMemoryThreshold
+    }
+
+    // Unroll this block safely, checking whether we have exceeded our threshold periodically
+    try {
+      while (values.hasNext && keepUnrolling) {
+        vector += values.next()
+        if (elementsUnrolled % memoryCheckPeriod == 0) {
+          // If our vector's size has exceeded the threshold, request more memory
+          val currentSize = vector.estimateSize()
+          if (currentSize >= memoryThreshold) {
+            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
+            keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+            if (keepUnrolling) {
+              pendingMemoryReserved += amountToRequest
+            }
+            // New threshold is currentSize * memoryGrowthFactor
+            memoryThreshold += amountToRequest
+          }
+        }
+        elementsUnrolled += 1
+      }
+
+      if (keepUnrolling) {
+        // We successfully unrolled the entirety of this block
+        Left(vector.toArray)
+      } else {
+        // We ran out of space while unrolling the values for this block
+        logUnrollFailureMessage(blockId, vector.estimateSize())
+        Right(vector.iterator ++ values)
+      }
+
+    } finally {
+      // If we return an array, the values returned here will be cached in `tryToPut` later.
+      // In this case, we should release the memory only after we cache the block there.
+      if (keepUnrolling) {
+        val taskAttemptId = currentTaskAttemptId()
+        memoryManager.synchronized {
+          // Since we continue to hold onto the array until we actually cache it, we cannot
+          // release the unroll memory yet. Instead, we transfer it to pending unroll memory
+          // so `tryToPut` can further transfer it to normal storage memory later.
+          // TODO: we can probably express this without pending unroll memory (SPARK-10907)
+          unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
+          pendingUnrollMemoryMap(taskAttemptId) =
+            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
+        }
+      } else {
+        // Otherwise, if we return an iterator, we can only release the unroll memory when
+        // the task finishes since we don't know when the iterator will be consumed.
+      }
+    }
+  }
+
+  /**
+   * Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
+   */
+  private def getRddId(blockId: BlockId): Option[Int] = {
+    blockId.asRDDId.map(_.rddId)
+  }
+
+  /**
+   * Try to put in a set of values, if we can free up enough space. The value should either be
+   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
+   * must also be passed by the caller.
+   *
+   * @return whether put was successful.
+   */
+  private def tryToPut(
+      blockId: BlockId,
+      value: () => Any,
+      size: Long,
+      deserialized: Boolean): Boolean = {
+    val acquiredEnoughStorageMemory = {
+      // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
+      // task.
+      memoryManager.synchronized {
+        // Note: if we have previously unrolled this block successfully, then pending unroll
+        // memory should be non-zero. This is the amount that we already reserved during the
+        // unrolling process. In this case, we can just reuse this space to cache our block.
+        // The synchronization on `memoryManager` here guarantees that the release and acquire
+        // happen atomically. This relies on the assumption that all memory acquisitions are
+        // synchronized on the same lock.
+        releasePendingUnrollMemoryForThisTask()
+        memoryManager.acquireStorageMemory(blockId, size)
+      }
+    }
+
+    if (acquiredEnoughStorageMemory) {
+      // We acquired enough memory for the block, so go ahead and put it
+      val entry = new MemoryEntry(value(), size, deserialized)
+      entries.synchronized {
+        entries.put(blockId, entry)
+      }
+      val valuesOrBytes = if (deserialized) "values" else "bytes"
+      logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+        blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+    * Try to evict blocks to free up a given amount of space to store a particular block.
+    * Can fail if either the block is bigger than our memory or it would require replacing
+    * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
+    * RDDs that don't fit into memory that we want to avoid).
+    *
+    * @param blockId the ID of the block we are freeing space for, if any
+    * @param space the size of this block
+    * @return the amount of memory (in bytes) freed by eviction
+    */
+  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
+    assert(space > 0)
+    memoryManager.synchronized {
+      var freedMemory = 0L
+      val rddToAdd = blockId.flatMap(getRddId)
+      val selectedBlocks = new ArrayBuffer[BlockId]
+      def blockIsEvictable(blockId: BlockId): Boolean = {
+        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+      }
+      // This is synchronized to ensure that the set of entries is not changed
+      // (because of getValue or getBytes) while traversing the iterator, as that
+      // can lead to exceptions.
+      entries.synchronized {
+        val iterator = entries.entrySet().iterator()
+        while (freedMemory < space && iterator.hasNext) {
+          val pair = iterator.next()
+          val blockId = pair.getKey
+          if (blockIsEvictable(blockId)) {
+            // We don't want to evict blocks which are currently being read, so we need to obtain
+            // an exclusive write lock on blocks which are candidates for eviction. We perform a
+            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
+            if (blockManager.blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
+              selectedBlocks += blockId
+              freedMemory += pair.getValue.size
+            }
+          }
+        }
+      }
+
+      if (freedMemory >= space) {
+        logInfo(s"${selectedBlocks.size} blocks selected for dropping")
+        for (blockId <- selectedBlocks) {
+          val entry = entries.synchronized { entries.get(blockId) }
+          // This should never be null as only one task should be dropping
+          // blocks and removing entries. However the check is still here for
+          // future safety.
+          if (entry != null) {
+            val data = if (entry.deserialized) {
+              Left(entry.value.asInstanceOf[Array[Any]])
+            } else {
+              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+            }
+            val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
+            if (newEffectiveStorageLevel.isValid) {
+              // The block is still present in at least one store, so release the lock
+              // but don't delete the block info
+              blockManager.releaseLock(blockId)
+            } else {
+              // The block isn't present in any store, so delete the block info so that the
+              // block can be stored again
+              blockManager.blockInfoManager.removeBlock(blockId)
+            }
+          }
+        }
+        freedMemory
+      } else {
+        blockId.foreach { id =>
+          logInfo(s"Will not store $id as it would require dropping another block " +
+            "from the same RDD")
+        }
+        selectedBlocks.foreach { id =>
+          blockManager.releaseLock(id)
+        }
+        0L
+      }
+    }
+  }
+
+  def contains(blockId: BlockId): Boolean = {
+    entries.synchronized { entries.containsKey(blockId) }
+  }
+
+  private def currentTaskAttemptId(): Long = {
+    // In case this is called on the driver, return an invalid task attempt id.
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
+  }
+
+  /**
+   * Reserve memory for unrolling the given block for this task.
+   *
+   * @return whether the request is granted.
+   */
+  def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
+    memoryManager.synchronized {
+      val success = memoryManager.acquireUnrollMemory(blockId, memory)
+      if (success) {
+        val taskAttemptId = currentTaskAttemptId()
+        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
+      }
+      success
+    }
+  }
+
+  /**
+   * Release memory used by this task for unrolling blocks.
+   * If the amount is not specified, remove the current task's allocation altogether.
+   */
+  def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
+    val taskAttemptId = currentTaskAttemptId()
+    memoryManager.synchronized {
+      if (unrollMemoryMap.contains(taskAttemptId)) {
+        val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
+        if (memoryToRelease > 0) {
+          unrollMemoryMap(taskAttemptId) -= memoryToRelease
+          if (unrollMemoryMap(taskAttemptId) == 0) {
+            unrollMemoryMap.remove(taskAttemptId)
+          }
+          memoryManager.releaseUnrollMemory(memoryToRelease)
+        }
+      }
+    }
+  }
+
+  /**
+   * Release pending unroll memory of current unroll successful block used by this task
+   */
+  def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
+    val taskAttemptId = currentTaskAttemptId()
+    memoryManager.synchronized {
+      if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
+        val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
+        if (memoryToRelease > 0) {
+          pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
+          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
+            pendingUnrollMemoryMap.remove(taskAttemptId)
+          }
+          memoryManager.releaseUnrollMemory(memoryToRelease)
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks across all tasks.
+   */
+  def currentUnrollMemory: Long = memoryManager.synchronized {
+    unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks by this task.
+   */
+  def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
+    unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
+  }
+
+  /**
+   * Return the number of tasks currently unrolling blocks.
+   */
+  private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
+
+  /**
+   * Log information about current memory usage.
+   */
+  private def logMemoryUsage(): Unit = {
+    logInfo(
+      s"Memory use = ${Utils.bytesToString(blocksMemoryUsed)} (blocks) + " +
+      s"${Utils.bytesToString(currentUnrollMemory)} (scratch space shared across " +
+      s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(memoryUsed)}. " +
+      s"Storage limit = ${Utils.bytesToString(maxMemory)}."
+    )
+  }
+
+  /**
+   * Log a warning for failing to unroll a block.
+   *
+   * @param blockId ID of the block we are trying to unroll.
+   * @param finalVectorSize Final size of the vector before unrolling failed.
+   */
+  private def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
+    logWarning(
+      s"Not enough space to cache $blockId in memory! " +
+      s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
+    )
+    logMemoryUsage()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 88fdbbd..f97cfbb 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -37,7 +37,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
     rdd.map { x =>
       val bm = SparkEnv.get.blockManager
       // Check if broadcast block was fetched
-      val isFound = bm.getLocal(BroadcastBlockId(bid)).isDefined
+      val isFound = bm.getLocalValues(BroadcastBlockId(bid)).isDefined
       (x, isFound)
     }.collect().toSet
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index d9764c7..686e948 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -31,7 +31,8 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel}
+import org.apache.spark.storage.memory.MemoryStore
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index eee78d3..741d4fd 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import org.mockito.Mockito.when
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9686c66..9001a26 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
   private val dummyBlock = TestBlockId("--")

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ae1faf5..b78a364 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -366,7 +366,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
         testStore => blockLocations.contains(testStore.blockManagerId.executorId)
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
-        assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        assert(
+          testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
         testStore.releaseLock(blockId)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0485b05..42595c8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.Arrays
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -614,11 +613,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle("a3", a3, storageLevel)
     assert(accessMethod("a2").isDefined, "a2 was not in store")
     assert(accessMethod("a3").isDefined, "a3 was not in store")
-    assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
     assert(accessMethod("a1").isDefined, "a1 was not in store")
     val dataShouldHaveBeenCachedBackIntoMemory = {
-      if (storageLevel.deserialized) !getAsBytes
-      else getAsBytes
+      if (storageLevel.deserialized) {
+        !getAsBytes
+      } else {
+        // If the block's storage level is serialized, then always cache the bytes in memory, even
+        // if the caller requested values.
+        true
+      }
     }
     if (dataShouldHaveBeenCachedBackIntoMemory) {
       assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
@@ -735,7 +738,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
-    assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+    assert(!store.memoryStore.contains("a2"), "a2 was in memory store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
 
@@ -829,50 +832,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     }
   }
 
-  test("reads of memory-mapped and non memory-mapped files are equivalent") {
-    val confKey = "spark.storage.memoryMapThreshold"
-
-    // Create a non-trivial (not all zeros) byte array
-    var counter = 0.toByte
-    def incr: Byte = {counter = (counter + 1).toByte; counter;}
-    val bytes = Array.fill[Byte](1000)(incr)
-    val byteBuffer = ByteBuffer.wrap(bytes)
-
-    val blockId = BlockId("rdd_1_2")
-
-    // This sequence of mocks makes these tests fairly brittle. It would
-    // be nice to refactor classes involved in disk storage in a way that
-    // allows for easier testing.
-    val blockManager = mock(classOf[BlockManager])
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
-    val diskBlockManager = new DiskBlockManager(blockManager, conf)
-
-    val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
-    diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
-    val mapped = diskStoreMapped.getBytes(blockId).get
-
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
-    val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
-    diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
-    val notMapped = diskStoreNotMapped.getBytes(blockId).get
-
-    // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
-    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
-      "Expected HeapByteBuffer for un-mapped read")
-    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
-
-    def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
-      val array = new Array[Byte](in.remaining())
-      in.get(array)
-      array
-    }
-
-    val mappedAsArray = arrayFromByteBuffer(mapped)
-    val notMappedAsArray = arrayFromByteBuffer(notMapped)
-    assert(Arrays.equals(mappedAsArray, bytes))
-    assert(Arrays.equals(notMappedAsArray, bytes))
-  }
-
   test("updated block statuses") {
     store = makeBlockManager(12000)
     store.registerTask(0)
@@ -1232,19 +1191,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    // Unroll huge block with not enough space. This should fail and drop the new block to disk
-    // directly in addition to kicking out b2 in the process. Memory store should contain only
-    // b3, while disk store should contain b1, b2 and b4.
+    // Unroll huge block with not enough space. This should fail and return an iterator so that
+    // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
+    // out, so the memory store should contain only b3, while the disk store should contain
+    // b1, b2 and b4.
     val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
-    assert(result4.isRight)
+    assert(result4.isLeft)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
-    assert(diskStore.contains("b1"))
-    assert(diskStore.contains("b2"))
-    assert(!diskStore.contains("b3"))
-    assert(diskStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
   }
 
@@ -1366,7 +1322,7 @@ private object BlockManagerSuite {
       getLocalAndReleaseLock(blockId).isDefined
     }
 
-    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
     val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
     val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
     val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 69e1746..bbfd6df 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
 
 import scala.language.reflectiveCalls
 
-import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,8 +32,6 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
   private var rootDir1: File = _
   private var rootDirs: String = _
 
-  val blockManager = mock(classOf[BlockManager])
-  when(blockManager.conf).thenReturn(testConf)
   var diskBlockManager: DiskBlockManager = _
 
   override def beforeAll() {
@@ -57,7 +54,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     super.beforeEach()
     val conf = testConf.clone
     conf.set("spark.local.dir", rootDirs)
-    diskBlockManager = new DiskBlockManager(blockManager, conf)
+    diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
   }
 
   override def afterEach() {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
new file mode 100644
index 0000000..97e74fe
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class DiskStoreSuite extends SparkFunSuite {
+
+  test("reads of memory-mapped and non memory-mapped files are equivalent") {
+    val confKey = "spark.storage.memoryMapThreshold"
+
+    // Create a non-trivial (not all zeros) byte array
+    val bytes = Array.tabulate[Byte](1000)(_.toByte)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+
+    val blockId = BlockId("rdd_1_2")
+    val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
+
+    val diskStoreMapped = new DiskStore(new SparkConf().set(confKey, "0"), diskBlockManager)
+    diskStoreMapped.putBytes(blockId, byteBuffer)
+    val mapped = diskStoreMapped.getBytes(blockId)
+    assert(diskStoreMapped.remove(blockId))
+
+    val diskStoreNotMapped = new DiskStore(new SparkConf().set(confKey, "1m"), diskBlockManager)
+    diskStoreNotMapped.putBytes(blockId, byteBuffer)
+    val notMapped = diskStoreNotMapped.getBytes(blockId)
+
+    // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+      "Expected HeapByteBuffer for un-mapped read")
+    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+
+    def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+      val array = new Array[Byte](in.remaining())
+      in.get(array)
+      array
+    }
+
+    val mappedAsArray = arrayFromByteBuffer(mapped)
+    val notMappedAsArray = arrayFromByteBuffer(notMapped)
+    assert(Arrays.equals(mappedAsArray, bytes))
+    assert(Arrays.equals(notMappedAsArray, bytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index dd16fc3..45424f9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -106,7 +106,10 @@ class ReceivedBlockHandlerSuite
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+          blockManager
+            .getLocalValues(blockId)
+            .map(_.data.map(_.toString).toList)
+            .getOrElse(List.empty)
         }.toList
         storedData shouldEqual data
 
@@ -130,7 +133,10 @@ class ReceivedBlockHandlerSuite
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+          blockManager
+            .getLocalValues(blockId)
+            .map(_.data.map(_.toString).toList)
+            .getOrElse(List.empty)
         }.toList
         storedData shouldEqual data
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk stores

Posted by an...@apache.org.
[SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk stores

Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, but I feel that this API is inappropriate because it abstracts away important distinctions between the behavior of these two stores.

For instance, the disk store doesn't have a notion of storing deserialized objects, so it's confusing for it to expose object-based APIs like putIterator() and getValues() instead of only exposing binary APIs and pushing the responsibilities of serialization and deserialization to the client. Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even though the disk store can only store blocks in one form.

As part of a larger BlockManager interface cleanup, this patch remove the BlockStore interface and refines the MemoryStore and DiskStore interfaces to reflect more narrow sets of responsibilities for those components. Some of the benefits of this interface cleanup are reflected in simplifications to several unit tests to eliminate now-unnecessary mocking, significant simplification of the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API between the MemoryStore and DiskStore.

Author: Josh Rosen <jo...@databricks.com>

Closes #11534 from JoshRosen/remove-blockstore-interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81d48532
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81d48532
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81d48532

Branch: refs/heads/master
Commit: 81d48532d954a8aea28d7e1fb3aa32a78c708b63
Parents: 3d2b6f5
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Mar 10 15:08:41 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Mar 10 15:08:41 2016 -0800

----------------------------------------------------------------------
 .../spark/broadcast/TorrentBroadcast.scala      |   2 +-
 .../org/apache/spark/memory/MemoryManager.scala |   4 +-
 .../apache/spark/memory/StorageMemoryPool.scala |   4 +-
 .../org/apache/spark/storage/BlockManager.scala | 612 ++++++++++---------
 .../org/apache/spark/storage/BlockStore.scala   |  63 --
 .../apache/spark/storage/DiskBlockManager.scala |  15 +-
 .../org/apache/spark/storage/DiskStore.scala    | 114 ++--
 .../org/apache/spark/storage/MemoryStore.scala  | 566 -----------------
 .../spark/storage/memory/MemoryStore.scala      | 522 ++++++++++++++++
 .../apache/spark/broadcast/BroadcastSuite.scala |   2 +-
 .../spark/memory/MemoryManagerSuite.scala       |   3 +-
 .../spark/memory/StaticMemoryManagerSuite.scala |   3 +-
 .../memory/UnifiedMemoryManagerSuite.scala      |   3 +-
 .../storage/BlockManagerReplicationSuite.scala  |   3 +-
 .../spark/storage/BlockManagerSuite.scala       |  74 +--
 .../spark/storage/DiskBlockManagerSuite.scala   |   5 +-
 .../apache/spark/storage/DiskStoreSuite.scala   |  62 ++
 .../streaming/ReceivedBlockHandlerSuite.scala   |  10 +-
 18 files changed, 995 insertions(+), 1072 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index dabc810..550e1ba 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
       val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocal(broadcastId).map(_.data.next()) match {
+      blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
         case Some(x) =>
           releaseLock(broadcastId)
           x.asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b5adbd8..e89b03e 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.memory.MemoryAllocator
 
@@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(
 
   /**
    * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+   *
    * @return the number of bytes freed.
    */
   private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6a88966..1d376ad 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 /**
  * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
    * @return whether all N bytes were successfully granted.
    */
   def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b38e2ec..873330e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,24 +40,15 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{Serializer, SerializerInstance}
 import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.storage.memory._
 import org.apache.spark.util._
 
-private[spark] sealed trait BlockValues
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
-
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
     val data: Iterator[Any],
     val readMethod: DataReadMethod.Value,
     val bytes: Long)
 
-// Class for representing return value of doPut()
-private sealed trait DoPutResult
-private case object DoPutSucceeded extends DoPutResult
-private case object DoPutBytesFailed extends DoPutResult
-private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
-
 /**
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -78,7 +69,15 @@ private[spark] class BlockManager(
     numUsableCores: Int)
   extends BlockDataManager with Logging {
 
-  val diskBlockManager = new DiskBlockManager(this, conf)
+  private[spark] val externalShuffleServiceEnabled =
+    conf.getBoolean("spark.shuffle.service.enabled", false)
+
+  val diskBlockManager = {
+    // Only perform cleanup if an external service is not serving our shuffle files.
+    val deleteFilesOnStop =
+      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
+    new DiskBlockManager(conf, deleteFilesOnStop)
+  }
 
   private[storage] val blockInfoManager = new BlockInfoManager
 
@@ -86,8 +85,8 @@ private[spark] class BlockManager(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
 
   // Actual storage of where blocks are kept
-  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
-  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+  private[spark] val memoryStore = new MemoryStore(conf, this, memoryManager)
+  private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
   memoryManager.setMemoryStore(memoryStore)
 
   // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -96,9 +95,6 @@ private[spark] class BlockManager(
   // to revisit whether reporting this value as the "max" is intuitive to the user.
   private val maxMemory = memoryManager.maxStorageMemory
 
-  private[spark]
-  val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-
   // Port used by the external shuffle service. In Yarn mode, this may be already be
   // set through the Hadoop configuration as the server is launched in the Yarn NM.
   private val externalShuffleServicePort = {
@@ -285,13 +281,9 @@ private[spark] class BlockManager(
     if (blockId.isShuffle) {
       shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
     } else {
-      val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
-        .asInstanceOf[Option[ByteBuffer]]
-      if (blockBytesOpt.isDefined) {
-        val buffer = blockBytesOpt.get
-        new BlockManagerManagedBuffer(this, blockId, buffer)
-      } else {
-        throw new BlockNotFoundException(blockId.toString)
+      getLocalBytes(blockId) match {
+        case Some(buffer) => new BlockManagerManagedBuffer(this, blockId, buffer)
+        case None => throw new BlockNotFoundException(blockId.toString)
       }
     }
   }
@@ -407,11 +399,71 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from local block manager.
+   * Get block from local block manager as an iterator of Java objects.
    */
-  def getLocal(blockId: BlockId): Option[BlockResult] = {
+  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting local block $blockId")
-    doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
+    blockInfoManager.lockForReading(blockId) match {
+      case None =>
+        logDebug(s"Block $blockId was not found")
+        None
+      case Some(info) =>
+        val level = info.level
+        logDebug(s"Level for block $blockId is $level")
+        if (level.useMemory && memoryStore.contains(blockId)) {
+          val iter: Iterator[Any] = if (level.deserialized) {
+            memoryStore.getValues(blockId).get
+          } else {
+            dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
+        } else if (level.useDisk && diskStore.contains(blockId)) {
+          val iterToReturn: Iterator[Any] = {
+            val diskBytes = diskStore.getBytes(blockId)
+            if (level.deserialized) {
+              val diskIterator = dataDeserialize(blockId, diskBytes)
+              if (level.useMemory) {
+                // Cache the values before returning them
+                memoryStore.putIterator(blockId, diskIterator, level) match {
+                  case Left(iter) =>
+                    // The memory store put() failed, so it returned the iterator back to us:
+                    iter
+                  case Right(_) =>
+                    // The put() succeeded, so we can read the values back:
+                    memoryStore.getValues(blockId).get
+                }
+              } else {
+                diskIterator
+              }
+            } else { // storage level is serialized
+              if (level.useMemory) {
+                // Cache the bytes back into memory to speed up subsequent reads.
+                val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+                  // https://issues.apache.org/jira/browse/SPARK-6076
+                  // If the file size is bigger than the free memory, OOM will happen. So if we
+                  // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+                  // this action is put into a `() => ByteBuffer` and created lazily.
+                  val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+                  copyForMemory.put(diskBytes)
+                })
+                if (putSucceeded) {
+                  dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+                } else {
+                  dataDeserialize(blockId, diskBytes)
+                }
+              } else {
+                dataDeserialize(blockId, diskBytes)
+              }
+            }
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          releaseLock(blockId)
+          throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+        }
+    }
   }
 
   /**
@@ -428,77 +480,44 @@ private[spark] class BlockManager(
       Option(
         shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
     } else {
-      doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
-    }
-  }
-
-  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
-    blockInfoManager.lockForReading(blockId) match {
-      case None =>
-        logDebug(s"Block $blockId was not found")
-        None
-      case Some(info) =>
-        doGetLocal(blockId, info, asBlockResult)
+      blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
     }
   }
 
   /**
-   * Get a local block from the block manager.
-   * Assumes that the caller holds a read lock on the block.
+   * Get block from the local block manager as serialized bytes.
+   *
+   * Must be called while holding a read lock on the block.
+   * Releases the read lock upon exception; keeps the read lock upon successful return.
    */
-  private def doGetLocal(
-      blockId: BlockId,
-      info: BlockInfo,
-      asBlockResult: Boolean): Option[Any] = {
+  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
     val level = info.level
     logDebug(s"Level for block $blockId is $level")
-
-    // Look for the block in memory
-    if (level.useMemory) {
-      logDebug(s"Getting block $blockId from memory")
-      val result = if (asBlockResult) {
-        memoryStore.getValues(blockId).map { iter =>
-          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
-          new BlockResult(ci, DataReadMethod.Memory, info.size)
-        }
+    // In order, try to read the serialized bytes from memory, then from disk, then fall back to
+    // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
+    if (level.deserialized) {
+      // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
+      if (level.useDisk && diskStore.contains(blockId)) {
+        // Note: we purposely do not try to put the block back into memory here. Since this branch
+        // handles deserialized blocks, this block may only be cached in memory as objects, not
+        // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
+        // cache the block's deserialized objects since that caching may not have a payoff.
+        diskStore.getBytes(blockId)
+      } else if (level.useMemory && memoryStore.contains(blockId)) {
+        // The block was not found on disk, so serialize an in-memory copy:
+        dataSerialize(blockId, memoryStore.getValues(blockId).get)
       } else {
-        memoryStore.getBytes(blockId)
-      }
-      result match {
-        case Some(values) =>
-          return result
-        case None =>
-          logDebug(s"Block $blockId not found in memory")
-      }
-    }
-
-    // Look for block on disk, potentially storing it back in memory if required
-    if (level.useDisk) {
-      logDebug(s"Getting block $blockId from disk")
-      val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-        case Some(b) => b
-        case None =>
-          releaseLock(blockId)
-          throw new BlockException(
-            blockId, s"Block $blockId not found on disk, though it should be")
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
       }
-      assert(0 == bytes.position())
-
-      if (!level.useMemory) {
-        // If the block shouldn't be stored in memory, we can just return it
-        if (asBlockResult) {
-          val iter = CompletionIterator[Any, Iterator[Any]](
-            dataDeserialize(blockId, bytes), releaseLock(blockId))
-          return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
-        } else {
-          return Some(bytes)
-        }
-      } else {
-        // Otherwise, we also have to store something in the memory store
-        if (!level.deserialized && !asBlockResult) {
-          /* We'll store the bytes in memory if the block's storage level includes
-           * "memory serialized" and we requested its serialized bytes. */
-          memoryStore.putBytes(blockId, bytes.limit, () => {
+    } else {  // storage level is serialized
+      if (level.useMemory && memoryStore.contains(blockId)) {
+        memoryStore.getBytes(blockId).get
+      } else if (level.useDisk && diskStore.contains(blockId)) {
+        val bytes = diskStore.getBytes(blockId)
+        if (level.useMemory) {
+          // Cache the bytes back into memory to speed up subsequent reads.
+          val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will happen. So if we cannot
             // put it into MemoryStore, copyForMemory should not be created. That's why this
@@ -506,39 +525,19 @@ private[spark] class BlockManager(
             val copyForMemory = ByteBuffer.allocate(bytes.limit)
             copyForMemory.put(bytes)
           })
-          bytes.rewind()
-        }
-        if (!asBlockResult) {
-          return Some(bytes)
-        } else {
-          val values = dataDeserialize(blockId, bytes)
-          val valuesToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              // Cache the values before returning them
-              memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
-                case Left(iter) =>
-                  // The memory store put() failed, so it returned the iterator back to us:
-                  iter
-                case Right(_) =>
-                  // The put() succeeded, so we can read the values back:
-                  memoryStore.getValues(blockId).get
-              }
-            } else {
-              values
-            }
+          if (memoryStorePutSucceeded) {
+            memoryStore.getBytes(blockId).get
+          } else {
+            bytes.rewind()
+            bytes
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
-          return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          bytes
         }
+      } else {
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
       }
-    } else {
-      // This branch represents a case where the BlockInfoManager contained an entry for
-      // the block but the block could not be found in any of the block stores. This case
-      // should never occur, but for completeness's sake we address it here.
-      logError(
-        s"Block $blockId is supposedly stored locally but was not found in any block store")
-      releaseLock(blockId)
-      None
     }
   }
 
@@ -547,17 +546,10 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  def getRemote(blockId: BlockId): Option[BlockResult] = {
-    logDebug(s"Getting remote block $blockId")
-    doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
-  }
-
-  /**
-   * Get block from remote block managers as serialized bytes.
-   */
-  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-    logDebug(s"Getting remote block $blockId as bytes")
-    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
+  def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+    getRemoteBytes(blockId).map { data =>
+      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+    }
   }
 
   /**
@@ -570,7 +562,11 @@ private[spark] class BlockManager(
     preferredLocs ++ otherLocs
   }
 
-  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
+  /**
+   * Get block from remote block managers as serialized bytes.
+   */
+  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+    logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     val locations = getLocations(blockId)
     var numFetchFailures = 0
@@ -595,14 +591,7 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        if (asBlockResult) {
-          return Some(new BlockResult(
-            dataDeserialize(blockId, data),
-            DataReadMethod.Network,
-            data.limit()))
-        } else {
-          return Some(data)
-        }
+        return Some(data)
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -618,12 +607,12 @@ private[spark] class BlockManager(
    * automatically be freed once the result's `data` iterator is fully consumed.
    */
   def get(blockId: BlockId): Option[BlockResult] = {
-    val local = getLocal(blockId)
+    val local = getLocalValues(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
       return local
     }
-    val remote = getRemote(blockId)
+    val remote = getRemoteValues(blockId)
     if (remote.isDefined) {
       logInfo(s"Found block $blockId remotely")
       return remote
@@ -673,24 +662,26 @@ private[spark] class BlockManager(
       level: StorageLevel,
       makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
     // Initially we hold no locks on this block.
-    doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
-      case DoPutSucceeded =>
+    doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+      case None =>
         // doPut() didn't hand work back to us, so the block already existed or was successfully
         // stored. Therefore, we now hold a read lock on the block.
-        val blockResult = get(blockId).getOrElse {
+        val blockResult = getLocalValues(blockId).getOrElse {
           // Since we held a read lock between the doPut() and get() calls, the block should not
           // have been evicted, so get() not returning the block indicates some internal error.
           releaseLock(blockId)
           throw new SparkException(s"get() failed for block $blockId even though we held a lock")
         }
+        // We already hold a read lock on the block from the doPut() call and getLocalValues()
+        // acquires the lock again, so we need to call releaseLock() here so that the net number
+        // of lock acquisitions is 1 (since the caller will only call release() once).
+        releaseLock(blockId)
         Left(blockResult)
-      case DoPutIteratorFailed(iter) =>
+      case Some(iter) =>
         // The put failed, likely because the data was too large to fit in memory and could not be
         // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
         // that they can decide what to do with the values (e.g. process them without caching).
        Right(iter)
-      case DoPutBytesFailed =>
-        throw new SparkException("doPut returned an invalid failure response")
     }
   }
 
@@ -701,16 +692,10 @@ private[spark] class BlockManager(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(values != null, "Values is null")
-    val result = doPut(
-      blockId,
-      IteratorValues(() => values),
-      level,
-      tellMaster,
-      effectiveStorageLevel)
-    result == DoPutSucceeded
+    // If doPut() didn't hand work back to us, then block already existed or was successfully stored
+    doPutIterator(blockId, () => values, level, tellMaster).isEmpty
   }
 
   /**
@@ -739,46 +724,105 @@ private[spark] class BlockManager(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
-    val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
-    result == DoPutSucceeded
+    doPutBytes(blockId, bytes, level, tellMaster)
   }
 
   /**
-   * Put the given block according to the given level in one of the block stores, replicating
+   * Put the given bytes according to the given level in one of the block stores, replicating
    * the values if necessary.
    *
    * If the block already exists, this method will not overwrite it.
    *
-   * @param effectiveStorageLevel the level according to which the block will actually be handled.
-   *                              This allows the caller to specify an alternate behavior of doPut
-   *                              while preserving the original level specified by the user.
    * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
    *                     block already exists). If false, this method will hold no locks when it
    *                     returns.
-   * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
-   *        [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
-   *        [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
+   * @return true if the block was already present or if the put succeeded, false otherwise.
    */
-  private def doPut(
+  private def doPutBytes(
       blockId: BlockId,
-      data: BlockValues,
+      bytes: ByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None,
-      keepReadLock: Boolean = false): DoPutResult = {
+      keepReadLock: Boolean = false): Boolean = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      // Since we're storing bytes, initiate the replication before storing them locally.
+      // This is faster as data is already serialized and ready to send.
+      val replicationFuture = if (level.replication > 1) {
+        // Duplicate doesn't copy the bytes, but just creates a wrapper
+        val bufferView = bytes.duplicate()
+        Future {
+          // This is a blocking action and should run in futureExecutionContext which is a cached
+          // thread pool
+          replicate(blockId, bufferView, level)
+        }(futureExecutionContext)
+      } else {
+        null
+      }
+
+      bytes.rewind()
+      val size = bytes.limit()
+
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        val putSucceeded = if (level.deserialized) {
+          val values = dataDeserialize(blockId, bytes.duplicate())
+          memoryStore.putIterator(blockId, values, level).isRight
+        } else {
+          memoryStore.putBytes(blockId, size, () => bytes)
+        }
+        if (!putSucceeded && level.useDisk) {
+          logWarning(s"Persisting block $blockId to disk instead.")
+          diskStore.putBytes(blockId, bytes)
+        }
+      } else if (level.useDisk) {
+        diskStore.putBytes(blockId, bytes)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+        }
+      }
+      logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+      if (level.replication > 1) {
+        // Wait for asynchronous replication to finish
+        Await.ready(replicationFuture, Duration.Inf)
+      }
+      if (blockWasSuccessfullyStored) {
+        None
+      } else {
+        Some(bytes)
+      }
+    }.isEmpty
+  }
+
+  /**
+   * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
+   *
+   * @param putBody a function which attempts the actual put() and returns None on success
+   *                or Some on failure.
+   */
+  private def doPut[T](
+      blockId: BlockId,
+      level: StorageLevel,
+      tellMaster: Boolean,
+      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
-    effectiveStorageLevel.foreach { level =>
-      require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
-    }
 
-    /* Remember the block's storage level so that we can correctly drop it to disk if it needs
-     * to be dropped right after it got put into memory. Note, however, that other threads will
-     * not be able to get() this block until we call markReady on its BlockInfo. */
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, tellMaster)
       if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
@@ -789,138 +833,113 @@ private[spark] class BlockManager(
           // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
           releaseLock(blockId)
         }
-        return DoPutSucceeded
+        return None
       }
     }
 
     val startTimeMs = System.currentTimeMillis
-
-    // Size of the block in bytes
-    var size = 0L
-
-    // The level we actually use to put the block
-    val putLevel = effectiveStorageLevel.getOrElse(level)
-
-    // If we're storing bytes, then initiate the replication before storing them locally.
-    // This is faster as data is already serialized and ready to send.
-    val replicationFuture = data match {
-      case b: ByteBufferValues if putLevel.replication > 1 =>
-        // Duplicate doesn't copy the bytes, but just creates a wrapper
-        val bufferView = b.buffer.duplicate()
-        Future {
-          // This is a blocking action and should run in futureExecutionContext which is a cached
-          // thread pool
-          replicate(blockId, bufferView, putLevel)
-        }(futureExecutionContext)
-      case _ => null
-    }
-
-    var blockWasSuccessfullyStored = false
-    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
-    putBlockInfo.synchronized {
-      logTrace("Put for block %s took %s to get into synchronized block"
-        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
-      try {
-        if (putLevel.useMemory) {
-          // Put it in memory first, even if it also has useDisk set to true;
-          // We will drop it to disk later if the memory store can't hold it.
-          data match {
-            case IteratorValues(iterator) =>
-              memoryStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                case Left(iter) =>
-                  iteratorFromFailedMemoryStorePut = Some(iter)
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              memoryStore.putBytes(blockId, bytes, putLevel)
-          }
-        } else if (putLevel.useDisk) {
-          data match {
-            case IteratorValues(iterator) =>
-              diskStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                // putIterator() will never return Left (see its return type).
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              diskStore.putBytes(blockId, bytes, putLevel)
-          }
+    var blockWasSuccessfullyStored: Boolean = false
+    val result: Option[T] = try {
+      val res = putBody(putBlockInfo)
+      blockWasSuccessfullyStored = res.isEmpty
+      res
+    } finally {
+      if (blockWasSuccessfullyStored) {
+        if (keepReadLock) {
+          blockInfoManager.downgradeLock(blockId)
         } else {
-          assert(putLevel == StorageLevel.NONE)
-          throw new BlockException(
-            blockId, s"Attempted to put block $blockId without specifying storage level!")
+          blockInfoManager.unlock(blockId)
         }
-
-        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
-        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
-        if (blockWasSuccessfullyStored) {
-          // Now that the block is in either the memory, externalBlockStore, or disk store,
-          // let other threads read it, and tell the master about it.
-          putBlockInfo.size = size
-          if (tellMaster) {
-            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
-          }
-          Option(TaskContext.get()).foreach { c =>
-            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
-          }
-        }
-      } finally {
-        if (blockWasSuccessfullyStored) {
-          if (keepReadLock) {
-            blockInfoManager.downgradeLock(blockId)
-          } else {
-            blockInfoManager.unlock(blockId)
-          }
-        } else {
-          blockInfoManager.removeBlock(blockId)
-          logWarning(s"Putting block $blockId failed")
-        }
-      }
-    }
-    logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
-    if (replicationFuture != null) {
-      // Wait for asynchronous replication to finish
-      Await.ready(replicationFuture, Duration.Inf)
-    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
-      val remoteStartTime = System.currentTimeMillis
-      val bytesToReplicate: ByteBuffer = {
-        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
-          .map(_.asInstanceOf[ByteBuffer])
-          .getOrElse {
-            throw new SparkException(s"Block $blockId was not found even though it was just stored")
-          }
-      }
-      try {
-        replicate(blockId, bytesToReplicate, putLevel)
-      } finally {
-        BlockManager.dispose(bytesToReplicate)
+      } else {
+        blockInfoManager.removeBlock(blockId)
+        logWarning(s"Putting block $blockId failed")
       }
-      logDebug("Put block %s remotely took %s"
-        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
     }
-
-    if (putLevel.replication > 1) {
+    if (level.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
       logDebug("Putting block %s without replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
+    result
+  }
 
-    if (blockWasSuccessfullyStored) {
-      DoPutSucceeded
-    } else if (iteratorFromFailedMemoryStorePut.isDefined) {
-      DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
-    } else {
-      DoPutBytesFailed
+  /**
+   * Put the given block according to the given level in one of the block stores, replicating
+   * the values if necessary.
+   *
+   * If the block already exists, this method will not overwrite it.
+   *
+   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+   *                     block already exists). If false, this method will hold no locks when it
+   *                     returns.
+   * @return None if the block was already present or if the put succeeded, or Some(iterator)
+   *         if the put failed.
+   */
+  private def doPutIterator(
+      blockId: BlockId,
+      iterator: () => Iterator[Any],
+      level: StorageLevel,
+      tellMaster: Boolean = true,
+      keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+      // Size of the block in bytes
+      var size = 0L
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        memoryStore.putIterator(blockId, iterator(), level) match {
+          case Right(s) =>
+            size = s
+          case Left(iter) =>
+            // Not enough space to unroll this block; drop to disk if applicable
+            if (level.useDisk) {
+              logWarning(s"Persisting block $blockId to disk instead.")
+              diskStore.put(blockId) { fileOutputStream =>
+                dataSerializeStream(blockId, fileOutputStream, iter)
+              }
+              size = diskStore.getSize(blockId)
+            } else {
+              iteratorFromFailedMemoryStorePut = Some(iter)
+            }
+        }
+      } else if (level.useDisk) {
+        diskStore.put(blockId) { fileOutputStream =>
+          dataSerializeStream(blockId, fileOutputStream, iterator())
+        }
+        size = diskStore.getSize(blockId)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+        }
+        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+        if (level.replication > 1) {
+          val remoteStartTime = System.currentTimeMillis
+          val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+          try {
+            replicate(blockId, bytesToReplicate, level)
+          } finally {
+            BlockManager.dispose(bytesToReplicate)
+          }
+          logDebug("Put block %s remotely took %s"
+            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+        }
+      }
+      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
+      iteratorFromFailedMemoryStorePut
     }
   }
 
@@ -1077,9 +1096,11 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putIterator(blockId, elements.toIterator, level)
+          diskStore.put(blockId) { fileOutputStream =>
+            dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+          }
         case Right(bytes) =>
-          diskStore.putBytes(blockId, bytes, level)
+          diskStore.putBytes(blockId, bytes)
       }
       blockIsUpdated = true
     }
@@ -1229,7 +1250,6 @@ private[spark] class BlockManager(
     rpcEnv.stop(slaveEndpoint)
     blockInfoManager.clear()
     memoryStore.clear()
-    diskStore.clear()
     futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
deleted file mode 100644
index b069918..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-
-/**
- * Abstract class to store blocks.
- */
-private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
-
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
-
-  /**
-   * Attempt to store an iterator of values.
-   *
-   * @return an iterator of values (in case the put failed), or the estimated size of the stored
-   *         values if the put succeeded.
-   */
-  def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long]
-
-  /**
-   * Return the size of a block in bytes.
-   */
-  def getSize(blockId: BlockId): Long
-
-  def getBytes(blockId: BlockId): Option[ByteBuffer]
-
-  def getValues(blockId: BlockId): Option[Iterator[Any]]
-
-  /**
-   * Remove a block, if it exists.
-   *
-   * @param blockId the block to remove.
-   * @return True if the block was found and removed, False otherwise.
-   * @throws IllegalStateException if the block is pinned by a task.
-   */
-  def remove(blockId: BlockId): Boolean
-
-  def contains(blockId: BlockId): Boolean
-
-  def clear() { }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4daf22f..e51d96e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -26,18 +26,14 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and physical on-disk
- * locations. By default, one block is mapped to one file with a name given by its BlockId.
- * However, it is also possible to have a block map to only a segment of a file, by calling
- * mapBlockToFileSegment().
+ * locations. One block is mapped to one file with a name given by its BlockId.
  *
  * Block files are hashed among the directories listed in spark.local.dir (or in
  * SPARK_LOCAL_DIRS, if it's set).
  */
-private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
-  extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
 
-  private[spark]
-  val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+  private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
 
   /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
    * directory, create multiple subdirectories that we will hash files into, in order to avoid
@@ -163,10 +159,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   }
 
   private def doStop(): Unit = {
-    // Only perform cleanup if an external service is not serving our shuffle files.
-    // Also blockManagerId could be null if block manager is not initialized properly.
-    if (!blockManager.externalShuffleServiceEnabled ||
-      (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
+    if (deleteFilesOnStop) {
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e35aa1b..caecd97 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,112 +17,100 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.Utils
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
-  extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
 
-  val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
-  override def getSize(blockId: BlockId): Long = {
+  def getSize(blockId: BlockId): Long = {
     diskManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val bytes = _bytes.duplicate()
+  /**
+   * Invokes the provided callback function to write the specific block.
+   *
+   * @throws IllegalStateException if the block already exists in the disk store.
+   */
+  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+    if (contains(blockId)) {
+      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
+    }
     logDebug(s"Attempting to put block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
-    val channel = new FileOutputStream(file).getChannel
-    Utils.tryWithSafeFinally {
-      while (bytes.remaining > 0) {
-        channel.write(bytes)
+    val fileOutputStream = new FileOutputStream(file)
+    var threwException: Boolean = true
+    try {
+      writeFunc(fileOutputStream)
+      threwException = false
+    } finally {
+      try {
+        Closeables.close(fileOutputStream, threwException)
+      } finally {
+         if (threwException) {
+          remove(blockId)
+        }
       }
-    } {
-      channel.close()
     }
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+      file.getName,
+      Utils.bytesToString(file.length()),
+      finishTime - startTime))
   }
 
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Right[Iterator[Any], Long] = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val startTime = System.currentTimeMillis
-    val file = diskManager.getFile(blockId)
-    val outputStream = new FileOutputStream(file)
-    try {
+  def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+    // So that we do not modify the input offsets !
+    // duplicate does not copy buffer, so inexpensive
+    val bytes = _bytes.duplicate()
+    put(blockId) { fileOutputStream =>
+      val channel = fileOutputStream.getChannel
       Utils.tryWithSafeFinally {
-        blockManager.dataSerializeStream(blockId, outputStream, values)
+        while (bytes.remaining > 0) {
+          channel.write(bytes)
+        }
       } {
-        // Close outputStream here because it should be closed before file is deleted.
-        outputStream.close()
+        channel.close()
       }
-    } catch {
-      case e: Throwable =>
-        if (file.exists()) {
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file}")
-          }
-        }
-        throw e
     }
-
-    val length = file.length
-
-    val timeTaken = System.currentTimeMillis - startTime
-    logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(length), timeTaken))
-
-    Right(length)
   }
 
-  private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
+  def getBytes(blockId: BlockId): ByteBuffer = {
+    val file = diskManager.getFile(blockId.name)
     val channel = new RandomAccessFile(file, "r").getChannel
     Utils.tryWithSafeFinally {
       // For small files, directly read rather than memory map
-      if (length < minMemoryMapBytes) {
-        val buf = ByteBuffer.allocate(length.toInt)
-        channel.position(offset)
+      if (file.length < minMemoryMapBytes) {
+        val buf = ByteBuffer.allocate(file.length.toInt)
+        channel.position(0)
         while (buf.remaining() != 0) {
           if (channel.read(buf) == -1) {
             throw new IOException("Reached EOF before filling buffer\n" +
-              s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
           }
         }
         buf.flip()
-        Some(buf)
+        buf
       } else {
-        Some(channel.map(MapMode.READ_ONLY, offset, length))
+        channel.map(MapMode.READ_ONLY, 0, file.length)
       }
     } {
       channel.close()
     }
   }
 
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val file = diskManager.getFile(blockId.name)
-    getBytes(file, 0, file.length)
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
-  }
-
-  override def remove(blockId: BlockId): Boolean = {
+  def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {
       val ret = file.delete()
@@ -135,7 +123,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     }
   }
 
-  override def contains(blockId: BlockId): Boolean = {
+  def contains(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     file.exists()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
deleted file mode 100644
index bb72fe4..0000000
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ /dev/null
@@ -1,566 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-import java.util.LinkedHashMap
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.TaskContext
-import org.apache.spark.memory.MemoryManager
-import org.apache.spark.util.{SizeEstimator, Utils}
-import org.apache.spark.util.collection.SizeTrackingVector
-
-private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
-
-/**
- * Stores blocks in memory, either as Arrays of deserialized Java objects or as
- * serialized ByteBuffers.
- */
-private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
-  extends BlockStore(blockManager) {
-
-  // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
-  // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
-
-  private val conf = blockManager.conf
-  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
-
-  // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
-  // All accesses of this map are assumed to have manually synchronized on `memoryManager`
-  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
-  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
-  // Pending unroll memory refers to the intermediate memory occupied by a task
-  // after the unroll but before the actual putting of the block in the cache.
-  // This chunk of memory is expected to be released *as soon as* we finish
-  // caching the corresponding block as opposed to until after the task finishes.
-  // This is only used if a block is successfully unrolled in its entirety in
-  // memory (SPARK-4777).
-  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
-
-  // Initial memory to request before unrolling any block
-  private val unrollMemoryThreshold: Long =
-    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
-
-  /** Total amount of memory available for storage, in bytes. */
-  private def maxMemory: Long = memoryManager.maxStorageMemory
-
-  if (maxMemory < unrollMemoryThreshold) {
-    logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
-      s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
-      s"memory. Please configure Spark with more memory.")
-  }
-
-  logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
-
-  /** Total storage memory used including unroll memory, in bytes. */
-  private def memoryUsed: Long = memoryManager.storageMemoryUsed
-
-  /**
-   * Amount of storage memory, in bytes, used for caching blocks.
-   * This does not include memory used for unrolling.
-   */
-  private def blocksMemoryUsed: Long = memoryManager.synchronized {
-    memoryUsed - currentUnrollMemory
-  }
-
-  override def getSize(blockId: BlockId): Long = {
-    entries.synchronized {
-      entries.get(blockId).size
-    }
-  }
-
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    val bytes = _bytes.duplicate()
-    bytes.rewind()
-    if (level.deserialized) {
-      val values = blockManager.dataDeserialize(blockId, bytes)
-      putIterator(blockId, values, level)
-    } else {
-      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-    }
-  }
-
-  /**
-   * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
-   * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
-   *
-   * The caller should guarantee that `size` is correct.
-   */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
-    if (putSuccess) {
-      assert(bytes.limit == size)
-    }
-  }
-
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long] = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    putIterator(blockId, values, level, allowPersistToDisk = true)
-  }
-
-  /**
-   * Attempt to put the given block in memory store.
-   *
-   * There may not be enough space to fully unroll the iterator in memory, in which case we
-   * optionally drop the values to disk if
-   *   (1) the block's storage level specifies useDisk, and
-   *   (2) `allowPersistToDisk` is true.
-   *
-   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
-   * back from disk and attempts to cache it in memory. In this case, we should not persist the
-   * block back on disk again, as it is already in disk store.
-   */
-  private[storage] def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel,
-      allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    val unrolledValues = unrollSafely(blockId, values)
-    unrolledValues match {
-      case Left(arrayValues) =>
-        // Values are fully unrolled in memory, so store them as an array
-        val size = {
-          if (level.deserialized) {
-            val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
-            tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
-            sizeEstimate
-          } else {
-            val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-            tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-            bytes.limit()
-          }
-        }
-        Right(size)
-      case Right(iteratorValues) =>
-        // Not enough space to unroll this block; drop to disk if applicable
-        if (level.useDisk && allowPersistToDisk) {
-          logWarning(s"Persisting block $blockId to disk instead.")
-          blockManager.diskStore.putIterator(blockId, iteratorValues, level)
-        } else {
-          Left(iteratorValues)
-        }
-    }
-  }
-
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else if (entry.deserialized) {
-      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
-    } else {
-      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
-    }
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else if (entry.deserialized) {
-      Some(entry.value.asInstanceOf[Array[Any]].iterator)
-    } else {
-      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
-      Some(blockManager.dataDeserialize(blockId, buffer))
-    }
-  }
-
-  override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
-    val entry = entries.synchronized {
-      entries.remove(blockId)
-    }
-    if (entry != null) {
-      memoryManager.releaseStorageMemory(entry.size)
-      logDebug(s"Block $blockId of size ${entry.size} dropped " +
-        s"from memory (free ${maxMemory - blocksMemoryUsed})")
-      true
-    } else {
-      false
-    }
-  }
-
-  override def clear(): Unit = memoryManager.synchronized {
-    entries.synchronized {
-      entries.clear()
-    }
-    unrollMemoryMap.clear()
-    pendingUnrollMemoryMap.clear()
-    memoryManager.releaseAllStorageMemory()
-    logInfo("MemoryStore cleared")
-  }
-
-  /**
-   * Unroll the given block in memory safely.
-   *
-   * The safety of this operation refers to avoiding potential OOM exceptions caused by
-   * unrolling the entirety of the block in memory at once. This is achieved by periodically
-   * checking whether the memory restrictions for unrolling blocks are still satisfied,
-   * stopping immediately if not. This check is a safeguard against the scenario in which
-   * there is not enough free memory to accommodate the entirety of a single block.
-   *
-   * This method returns either an array with the contents of the entire block or an iterator
-   * containing the values of the block (if the array would have exceeded available memory).
-   */
-  def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
-
-    // Number of elements unrolled so far
-    var elementsUnrolled = 0
-    // Whether there is still enough memory for us to continue unrolling this block
-    var keepUnrolling = true
-    // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
-    val initialMemoryThreshold = unrollMemoryThreshold
-    // How often to check whether we need to request more memory
-    val memoryCheckPeriod = 16
-    // Memory currently reserved by this task for this particular unrolling operation
-    var memoryThreshold = initialMemoryThreshold
-    // Memory to request as a multiple of current vector size
-    val memoryGrowthFactor = 1.5
-    // Keep track of pending unroll memory reserved by this method.
-    var pendingMemoryReserved = 0L
-    // Underlying vector for unrolling the block
-    var vector = new SizeTrackingVector[Any]
-
-    // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
-
-    if (!keepUnrolling) {
-      logWarning(s"Failed to reserve initial memory threshold of " +
-        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
-    } else {
-      pendingMemoryReserved += initialMemoryThreshold
-    }
-
-    // Unroll this block safely, checking whether we have exceeded our threshold periodically
-    try {
-      while (values.hasNext && keepUnrolling) {
-        vector += values.next()
-        if (elementsUnrolled % memoryCheckPeriod == 0) {
-          // If our vector's size has exceeded the threshold, request more memory
-          val currentSize = vector.estimateSize()
-          if (currentSize >= memoryThreshold) {
-            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
-            keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
-            if (keepUnrolling) {
-              pendingMemoryReserved += amountToRequest
-            }
-            // New threshold is currentSize * memoryGrowthFactor
-            memoryThreshold += amountToRequest
-          }
-        }
-        elementsUnrolled += 1
-      }
-
-      if (keepUnrolling) {
-        // We successfully unrolled the entirety of this block
-        Left(vector.toArray)
-      } else {
-        // We ran out of space while unrolling the values for this block
-        logUnrollFailureMessage(blockId, vector.estimateSize())
-        Right(vector.iterator ++ values)
-      }
-
-    } finally {
-      // If we return an array, the values returned here will be cached in `tryToPut` later.
-      // In this case, we should release the memory only after we cache the block there.
-      if (keepUnrolling) {
-        val taskAttemptId = currentTaskAttemptId()
-        memoryManager.synchronized {
-          // Since we continue to hold onto the array until we actually cache it, we cannot
-          // release the unroll memory yet. Instead, we transfer it to pending unroll memory
-          // so `tryToPut` can further transfer it to normal storage memory later.
-          // TODO: we can probably express this without pending unroll memory (SPARK-10907)
-          unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
-          pendingUnrollMemoryMap(taskAttemptId) =
-            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
-        }
-      } else {
-        // Otherwise, if we return an iterator, we can only release the unroll memory when
-        // the task finishes since we don't know when the iterator will be consumed.
-      }
-    }
-  }
-
-  /**
-   * Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
-   */
-  private def getRddId(blockId: BlockId): Option[Int] = {
-    blockId.asRDDId.map(_.rddId)
-  }
-
-  /**
-   * Try to put in a set of values, if we can free up enough space. The value should either be
-   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
-   * must also be passed by the caller.
-   *
-   * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
-   * created to avoid OOM since it may be a big ByteBuffer.
-   *
-   * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
-   * dropping is done by only on thread at a time. Otherwise while one thread is dropping
-   * blocks to free memory for one block, another thread may use up the freed space for
-   * another block.
-   *
-   * @return whether put was successful.
-   */
-  private def tryToPut(
-      blockId: BlockId,
-      value: () => Any,
-      size: Long,
-      deserialized: Boolean): Boolean = {
-
-    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
-     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
-     * been released, it must be ensured that those to-be-dropped blocks are not double counted
-     * for freeing up more space for another block that needs to be put. Only then the actually
-     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
-
-    memoryManager.synchronized {
-      // Note: if we have previously unrolled this block successfully, then pending unroll
-      // memory should be non-zero. This is the amount that we already reserved during the
-      // unrolling process. In this case, we can just reuse this space to cache our block.
-      // The synchronization on `memoryManager` here guarantees that the release and acquire
-      // happen atomically. This relies on the assumption that all memory acquisitions are
-      // synchronized on the same lock.
-      releasePendingUnrollMemoryForThisTask()
-      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
-      if (enoughMemory) {
-        // We acquired enough memory for the block, so go ahead and put it
-        val entry = new MemoryEntry(value(), size, deserialized)
-        entries.synchronized {
-          entries.put(blockId, entry)
-        }
-        val valuesOrBytes = if (deserialized) "values" else "bytes"
-        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
-          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
-      } else {
-        // Tell the block manager that we couldn't put it in memory so that it can drop it to
-        // disk if the block allows disk storage.
-        lazy val data = if (deserialized) {
-          Left(value().asInstanceOf[Array[Any]])
-        } else {
-          Right(value().asInstanceOf[ByteBuffer].duplicate())
-        }
-        blockManager.dropFromMemory(blockId, () => data)
-      }
-      enoughMemory
-    }
-  }
-
-  /**
-    * Try to evict blocks to free up a given amount of space to store a particular block.
-    * Can fail if either the block is bigger than our memory or it would require replacing
-    * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
-    * RDDs that don't fit into memory that we want to avoid).
-    *
-    * @param blockId the ID of the block we are freeing space for, if any
-    * @param space the size of this block
-    * @return the amount of memory (in bytes) freed by eviction
-    */
-  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
-    assert(space > 0)
-    memoryManager.synchronized {
-      var freedMemory = 0L
-      val rddToAdd = blockId.flatMap(getRddId)
-      val selectedBlocks = new ArrayBuffer[BlockId]
-      def blockIsEvictable(blockId: BlockId): Boolean = {
-        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
-      }
-      // This is synchronized to ensure that the set of entries is not changed
-      // (because of getValue or getBytes) while traversing the iterator, as that
-      // can lead to exceptions.
-      entries.synchronized {
-        val iterator = entries.entrySet().iterator()
-        while (freedMemory < space && iterator.hasNext) {
-          val pair = iterator.next()
-          val blockId = pair.getKey
-          if (blockIsEvictable(blockId)) {
-            // We don't want to evict blocks which are currently being read, so we need to obtain
-            // an exclusive write lock on blocks which are candidates for eviction. We perform a
-            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
-            if (blockManager.blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
-              selectedBlocks += blockId
-              freedMemory += pair.getValue.size
-            }
-          }
-        }
-      }
-
-      if (freedMemory >= space) {
-        logInfo(s"${selectedBlocks.size} blocks selected for dropping")
-        for (blockId <- selectedBlocks) {
-          val entry = entries.synchronized { entries.get(blockId) }
-          // This should never be null as only one task should be dropping
-          // blocks and removing entries. However the check is still here for
-          // future safety.
-          if (entry != null) {
-            val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[Array[Any]])
-            } else {
-              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
-            }
-            val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
-            if (newEffectiveStorageLevel.isValid) {
-              // The block is still present in at least one store, so release the lock
-              // but don't delete the block info
-              blockManager.releaseLock(blockId)
-            } else {
-              // The block isn't present in any store, so delete the block info so that the
-              // block can be stored again
-              blockManager.blockInfoManager.removeBlock(blockId)
-            }
-          }
-        }
-        freedMemory
-      } else {
-        blockId.foreach { id =>
-          logInfo(s"Will not store $id as it would require dropping another block " +
-            "from the same RDD")
-        }
-        selectedBlocks.foreach { id =>
-          blockManager.releaseLock(id)
-        }
-        0L
-      }
-    }
-  }
-
-  override def contains(blockId: BlockId): Boolean = {
-    entries.synchronized { entries.containsKey(blockId) }
-  }
-
-  private def currentTaskAttemptId(): Long = {
-    // In case this is called on the driver, return an invalid task attempt id.
-    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
-  }
-
-  /**
-   * Reserve memory for unrolling the given block for this task.
-   *
-   * @return whether the request is granted.
-   */
-  def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
-    memoryManager.synchronized {
-      val success = memoryManager.acquireUnrollMemory(blockId, memory)
-      if (success) {
-        val taskAttemptId = currentTaskAttemptId()
-        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
-      }
-      success
-    }
-  }
-
-  /**
-   * Release memory used by this task for unrolling blocks.
-   * If the amount is not specified, remove the current task's allocation altogether.
-   */
-  def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
-    val taskAttemptId = currentTaskAttemptId()
-    memoryManager.synchronized {
-      if (unrollMemoryMap.contains(taskAttemptId)) {
-        val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
-        if (memoryToRelease > 0) {
-          unrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (unrollMemoryMap(taskAttemptId) == 0) {
-            unrollMemoryMap.remove(taskAttemptId)
-          }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
-        }
-      }
-    }
-  }
-
-  /**
-   * Release pending unroll memory of current unroll successful block used by this task
-   */
-  def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
-    val taskAttemptId = currentTaskAttemptId()
-    memoryManager.synchronized {
-      if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
-        val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
-        if (memoryToRelease > 0) {
-          pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
-            pendingUnrollMemoryMap.remove(taskAttemptId)
-          }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
-        }
-      }
-    }
-  }
-
-  /**
-   * Return the amount of memory currently occupied for unrolling blocks across all tasks.
-   */
-  def currentUnrollMemory: Long = memoryManager.synchronized {
-    unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
-  }
-
-  /**
-   * Return the amount of memory currently occupied for unrolling blocks by this task.
-   */
-  def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
-    unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
-  }
-
-  /**
-   * Return the number of tasks currently unrolling blocks.
-   */
-  private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
-
-  /**
-   * Log information about current memory usage.
-   */
-  private def logMemoryUsage(): Unit = {
-    logInfo(
-      s"Memory use = ${Utils.bytesToString(blocksMemoryUsed)} (blocks) + " +
-      s"${Utils.bytesToString(currentUnrollMemory)} (scratch space shared across " +
-      s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(memoryUsed)}. " +
-      s"Storage limit = ${Utils.bytesToString(maxMemory)}."
-    )
-  }
-
-  /**
-   * Log a warning for failing to unroll a block.
-   *
-   * @param blockId ID of the block we are trying to unroll.
-   * @param finalVectorSize Final size of the vector before unrolling failed.
-   */
-  private def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
-    logWarning(
-      s"Not enough space to cache $blockId in memory! " +
-      s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
-    )
-    logMemoryUsage()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org