You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/13 05:41:25 UTC

git commit: [Minor] Fix style, formatting and naming in BlockManager etc.

Repository: spark
Updated Branches:
  refs/heads/master 1de1d703b -> 44daec5ab


[Minor] Fix style, formatting and naming in BlockManager etc.

This is a precursor to a bigger change. I wanted to separate out the relatively insignificant changes so the ultimate PR is not inflated.

(Warning: this PR is full of unimportant nitpicks)

Author: Andrew Or <an...@gmail.com>

Closes #1058 from andrewor14/bm-minor and squashes the following commits:

8e12eaf [Andrew Or] SparkException -> BlockException
c36fd53 [Andrew Or] Make parts of BlockManager more readable
0a5f378 [Andrew Or] Entry -> MemoryEntry
e9762a5 [Andrew Or] Tone down string interpolation (minor reverts)
c4de9ac [Andrew Or] Merge branch 'master' of github.com:apache/spark into bm-minor
b3470f1 [Andrew Or] More string interpolation (minor)
7f9dcab [Andrew Or] Use string interpolation (minor)
94a425b [Andrew Or] Refactor against duplicate code + minor changes
8a6a7dc [Andrew Or] Exception -> SparkException
97c410f [Andrew Or] Deal with MIMA excludes
2480f1d [Andrew Or] Fixes in StorgeLevel.scala
abb0163 [Andrew Or] Style, formatting and naming fixes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44daec5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44daec5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44daec5a

Branch: refs/heads/master
Commit: 44daec5abd4c271ea0003ecdabab92cc958dea13
Parents: 1de1d70
Author: Andrew Or <an...@gmail.com>
Authored: Thu Jun 12 20:40:58 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jun 12 20:40:58 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  20 +-
 .../org/apache/spark/storage/BlockInfo.scala    |  22 +-
 .../org/apache/spark/storage/BlockManager.scala | 479 +++++++++----------
 .../apache/spark/storage/DiskBlockManager.scala |  21 +-
 .../org/apache/spark/storage/DiskStore.scala    |  24 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  32 +-
 .../org/apache/spark/storage/StorageLevel.scala |  88 ++--
 .../org/apache/spark/storage/TachyonStore.scala |  39 +-
 project/MimaExcludes.scala                      |   7 +-
 9 files changed, 362 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 811610c..315ed91 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   private val loading = new HashSet[RDDBlockId]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
-  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
+  def getOrCompute[T](
+      rdd: RDD[T],
+      split: Partition,
+      context: TaskContext,
       storageLevel: StorageLevel): Iterator[T] = {
+
     val key = RDDBlockId(rdd.id, split.index)
-    logDebug("Looking for partition " + key)
+    logDebug(s"Looking for partition $key")
     blockManager.get(key) match {
       case Some(values) =>
         // Partition is already materialized, so just return its values
@@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         // Mark the split as loading (unless someone else marks it first)
         loading.synchronized {
           if (loading.contains(key)) {
-            logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
+            logInfo(s"Another thread is loading $key, waiting for it to finish...")
             while (loading.contains(key)) {
               try {
                 loading.wait()
@@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
                   logWarning(s"Got an exception while waiting for another thread to load $key", e)
               }
             }
-            logInfo("Finished waiting for %s".format(key))
+            logInfo(s"Finished waiting for $key")
             /* See whether someone else has successfully loaded it. The main way this would fail
              * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
              * partition but we didn't want to make space for it. However, that case is unlikely
@@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
               case Some(values) =>
                 return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
               case None =>
-                logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
+                logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
                 loading.add(key)
             }
           } else {
@@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         }
         try {
           // If we got here, we have to load the split
-          logInfo("Partition %s not found, computing it".format(key))
+          logInfo(s"Partition $key not found, computing it")
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
 
           // Persist the result, so long as the task is not running locally
@@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
                 case Some(values) =>
                   values.asInstanceOf[Iterator[T]]
                 case None =>
-                  logInfo("Failure to store %s".format(key))
-                  throw new Exception("Block manager failed to return persisted valued")
+                  logInfo(s"Failure to store $key")
+                  throw new SparkException("Block manager failed to return persisted value")
               }
             } else {
               // In this case the RDD is cached to an array buffer. This will save the results

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index c8f3976..22fdf73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -29,9 +29,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
   setInitThread()
 
   private def setInitThread() {
-    // Set current thread as init thread - waitForReady will not block this thread
-    // (in case there is non trivial initialization which ends up calling waitForReady as part of
-    // initialization itself)
+    /* Set current thread as init thread - waitForReady will not block this thread
+     * (in case there is non trivial initialization which ends up calling waitForReady
+     * as part of initialization itself) */
     BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
   }
 
@@ -42,7 +42,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
   def waitForReady(): Boolean = {
     if (pending && initThread != Thread.currentThread()) {
       synchronized {
-        while (pending) this.wait()
+        while (pending) {
+          this.wait()
+        }
       }
     }
     !failed
@@ -50,8 +52,8 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
 
   /** Mark this BlockInfo as ready (i.e. block is finished writing) */
   def markReady(sizeInBytes: Long) {
-    require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
-    assert (pending)
+    require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
+    assert(pending)
     size = sizeInBytes
     BlockInfo.blockInfoInitThreads.remove(this)
     synchronized {
@@ -61,7 +63,7 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
 
   /** Mark this BlockInfo as ready but failed */
   def markFailure() {
-    assert (pending)
+    assert(pending)
     size = BlockInfo.BLOCK_FAILED
     BlockInfo.blockInfoInitThreads.remove(this)
     synchronized {
@@ -71,9 +73,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
 }
 
 private object BlockInfo {
-  // initThread is logically a BlockInfo field, but we store it here because
-  // it's only needed while this block is in the 'pending' state and we want
-  // to minimize BlockInfo's memory footprint.
+  /* initThread is logically a BlockInfo field, but we store it here because
+   * it's only needed while this block is in the 'pending' state and we want
+   * to minimize BlockInfo's memory footprint. */
   private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
 
   private val BLOCK_PENDING: Long = -1L

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9cd79d2..f52bc70 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -28,46 +28,48 @@ import scala.util.Random
 import akka.actor.{ActorSystem, Cancellable, Props}
 import sun.nio.ch.DirectBuffer
 
-import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
+import org.apache.spark._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util._
 
-private[spark] sealed trait Values
-
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends Values
-private[spark] case class IteratorValues(iterator: Iterator[Any]) extends Values
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
+private[spark] sealed trait BlockValues
+private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
+private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
+private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
 
 private[spark] class BlockManager(
     executorId: String,
     actorSystem: ActorSystem,
     val master: BlockManagerMaster,
-    val defaultSerializer: Serializer,
+    defaultSerializer: Serializer,
     maxMemory: Long,
-    val _conf: SparkConf,
+    val conf: SparkConf,
     securityManager: SecurityManager,
     mapOutputTracker: MapOutputTracker)
   extends Logging {
 
-  def conf = _conf
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
-    conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))
+    conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+  val connectionManager = new ConnectionManager(0, conf, securityManager)
+
+  implicit val futureExecContext = connectionManager.futureExecContext
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
+  // Actual storage of where blocks are kept
+  private var tachyonInitialized = false
   private[storage] val memoryStore = new MemoryStore(this, maxMemory)
   private[storage] val diskStore = new DiskStore(this, diskBlockManager)
-  var tachyonInitialized = false
   private[storage] lazy val tachyonStore: TachyonStore = {
     val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
     val appFolderName = conf.get("spark.tachyonStore.folderName")
-    val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
+    val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
     val tachyonMaster = conf.get("spark.tachyonStore.url",  "tachyon://localhost:19998")
-    val tachyonBlockManager = new TachyonBlockManager(
-      shuffleBlockManager, tachyonStorePath, tachyonMaster)
+    val tachyonBlockManager =
+      new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster)
     tachyonInitialized = true
     new TachyonStore(this, tachyonBlockManager)
   }
@@ -79,43 +81,39 @@ private[spark] class BlockManager(
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
-  val connectionManager = new ConnectionManager(0, conf, securityManager)
-  implicit val futureExecContext = connectionManager.futureExecContext
-
   val blockManagerId = BlockManagerId(
     executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
 
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
-  val maxBytesInFlight =
-    conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
+  val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
 
   // Whether to compress broadcast variables that are stored
-  val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
+  private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
   // Whether to compress shuffle output that are stored
-  val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
+  private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
   // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = conf.getBoolean("spark.rdd.compress", false)
+  private val compressRdds = conf.getBoolean("spark.rdd.compress", false)
   // Whether to compress shuffle output temporarily spilled to disk
-  val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
+  private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
 
-  val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
-
-  val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
+  private val slaveActor = actorSystem.actorOf(
+    Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
     name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
 
-  // Pending re-registration action being executed asynchronously or null if none
-  // is pending. Accesses should synchronize on asyncReregisterLock.
-  var asyncReregisterTask: Future[Unit] = null
-  val asyncReregisterLock = new Object
+  // Pending re-registration action being executed asynchronously or null if none is pending.
+  // Accesses should synchronize on asyncReregisterLock.
+  private var asyncReregisterTask: Future[Unit] = null
+  private val asyncReregisterLock = new Object
 
-  private def heartBeat() {
+  private def heartBeat(): Unit = {
     if (!master.sendHeartBeat(blockManagerId)) {
       reregister()
     }
   }
 
-  var heartBeatTask: Cancellable = null
+  private val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
+  private var heartBeatTask: Cancellable = null
 
   private val metadataCleaner = new MetadataCleaner(
     MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
@@ -124,11 +122,11 @@ private[spark] class BlockManager(
 
   initialize()
 
-  // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
-  // the initialization of the compression codec until it is first used. The reason is that a Spark
-  // program could be using a user-defined codec in a third party jar, which is loaded in
-  // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
-  // loaded yet.
+  /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+   * the initialization of the compression codec until it is first used. The reason is that a Spark
+   * program could be using a user-defined codec in a third party jar, which is loaded in
+   * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+   * loaded yet. */
   private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
 
   /**
@@ -150,7 +148,7 @@ private[spark] class BlockManager(
    * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
    * BlockManagerWorker actor.
    */
-  private def initialize() {
+  private def initialize(): Unit = {
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     BlockManagerWorker.startBlockManagerWorker(this)
     if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
@@ -170,12 +168,12 @@ private[spark] class BlockManager(
    * heart beat attempt or new block registration and another try to re-register all blocks
    * will be made then.
    */
-  private def reportAllBlocks() {
-    logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+  private def reportAllBlocks(): Unit = {
+    logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
     for ((blockId, info) <- blockInfo) {
       val status = getCurrentBlockStatus(blockId, info)
       if (!tryToReportBlockStatus(blockId, info, status)) {
-        logError("Failed to report " + blockId + " to master; giving up.")
+        logError(s"Failed to report $blockId to master; giving up.")
         return
       }
     }
@@ -187,7 +185,7 @@ private[spark] class BlockManager(
    *
    * Note that this method must be called without any BlockInfo locks held.
    */
-  def reregister() {
+  private def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo("BlockManager re-registering with master")
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
@@ -197,7 +195,7 @@ private[spark] class BlockManager(
   /**
    * Re-register with the master sometime soon.
    */
-  def asyncReregister() {
+  private def asyncReregister(): Unit = {
     asyncReregisterLock.synchronized {
       if (asyncReregisterTask == null) {
         asyncReregisterTask = Future[Unit] {
@@ -213,7 +211,7 @@ private[spark] class BlockManager(
   /**
    * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
    */
-  def waitForAsyncReregister() {
+  def waitForAsyncReregister(): Unit = {
     val task = asyncReregisterTask
     if (task != null) {
       Await.ready(task, Duration.Inf)
@@ -251,18 +249,18 @@ private[spark] class BlockManager(
    * it is still valid). This ensures that update in master will compensate for the increase in
    * memory on slave.
    */
-  def reportBlockStatus(
+  private def reportBlockStatus(
       blockId: BlockId,
       info: BlockInfo,
       status: BlockStatus,
-      droppedMemorySize: Long = 0L) {
+      droppedMemorySize: Long = 0L): Unit = {
     val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
     if (needReregister) {
-      logInfo("Got told to re-register updating block " + blockId)
+      logInfo(s"Got told to re-register updating block $blockId")
       // Re-registering will report our new block for free.
       asyncReregister()
     }
-    logDebug("Told master about block " + blockId)
+    logDebug(s"Told master about block $blockId")
   }
 
   /**
@@ -293,10 +291,10 @@ private[spark] class BlockManager(
    * and the updated in-memory and on-disk sizes.
    */
   private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
-    val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
+    info.synchronized {
       info.level match {
         case null =>
-          (StorageLevel.NONE, 0L, 0L, 0L)
+          BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
         case level =>
           val inMem = level.useMemory && memoryStore.contains(blockId)
           val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
@@ -307,19 +305,18 @@ private[spark] class BlockManager(
           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
           val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
-          (storageLevel, memSize, diskSize, tachyonSize)
+          BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
       }
     }
-    BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
   }
 
   /**
    * Get locations of an array of blocks.
    */
-  def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
+  private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
     val startTimeMs = System.currentTimeMillis
     val locations = master.getLocations(blockIds).toArray
-    logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
+    logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))
     locations
   }
 
@@ -329,15 +326,16 @@ private[spark] class BlockManager(
    * never deletes (recent) items.
    */
   def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
-    diskStore.getValues(blockId, serializer).orElse(
-      sys.error("Block " + blockId + " not found on disk, though it should be"))
+    diskStore.getValues(blockId, serializer).orElse {
+      throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
+    }
   }
 
   /**
    * Get block from local block manager.
    */
   def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
-    logDebug("Getting local block " + blockId)
+    logDebug(s"Getting local block $blockId")
     doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
   }
 
@@ -345,7 +343,7 @@ private[spark] class BlockManager(
    * Get block from the local block manager as serialized bytes.
    */
   def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
-    logDebug("Getting local block " + blockId + " as bytes")
+    logDebug(s"Getting local block $blockId as bytes")
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
     if (blockId.isShuffle) {
@@ -353,7 +351,8 @@ private[spark] class BlockManager(
         case Some(bytes) =>
           Some(bytes)
         case None =>
-          throw new Exception("Block " + blockId + " not found on disk, though it should be")
+          throw new BlockException(
+            blockId, s"Block $blockId not found on disk, though it should be")
       }
     } else {
       doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
@@ -368,16 +367,16 @@ private[spark] class BlockManager(
         // If another thread is writing the block, wait for it to become ready.
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure.")
+          logWarning(s"Block $blockId was marked as failure.")
           return None
         }
 
         val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
+        logDebug(s"Level for block $blockId is $level")
 
         // Look for the block in memory
         if (level.useMemory) {
-          logDebug("Getting block " + blockId + " from memory")
+          logDebug(s"Getting block $blockId from memory")
           val result = if (asValues) {
             memoryStore.getValues(blockId)
           } else {
@@ -387,51 +386,51 @@ private[spark] class BlockManager(
             case Some(values) =>
               return Some(values)
             case None =>
-              logDebug("Block " + blockId + " not found in memory")
+              logDebug(s"Block $blockId not found in memory")
           }
         }
 
         // Look for the block in Tachyon
         if (level.useOffHeap) {
-          logDebug("Getting block " + blockId + " from tachyon")
+          logDebug(s"Getting block $blockId from tachyon")
           if (tachyonStore.contains(blockId)) {
             tachyonStore.getBytes(blockId) match {
-              case Some(bytes) => {
+              case Some(bytes) =>
                 if (!asValues) {
                   return Some(bytes)
                 } else {
                   return Some(dataDeserialize(blockId, bytes))
                 }
-              }
               case None =>
-                logDebug("Block " + blockId + " not found in tachyon")
+                logDebug(s"Block $blockId not found in tachyon")
             }
           }
         }
 
-        // Look for block on disk, potentially storing it back into memory if required:
+        // Look for block on disk, potentially storing it back in memory if required
         if (level.useDisk) {
-          logDebug("Getting block " + blockId + " from disk")
+          logDebug(s"Getting block $blockId from disk")
           val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-            case Some(bytes) => bytes
+            case Some(b) => b
             case None =>
-              throw new Exception("Block " + blockId + " not found on disk, though it should be")
+              throw new BlockException(
+                blockId, s"Block $blockId not found on disk, though it should be")
           }
-          assert (0 == bytes.position())
+          assert(0 == bytes.position())
 
           if (!level.useMemory) {
-            // If the block shouldn't be stored in memory, we can just return it:
+            // If the block shouldn't be stored in memory, we can just return it
             if (asValues) {
               return Some(dataDeserialize(blockId, bytes))
             } else {
               return Some(bytes)
             }
           } else {
-            // Otherwise, we also have to store something in the memory store:
+            // Otherwise, we also have to store something in the memory store
             if (!level.deserialized || !asValues) {
-              // We'll store the bytes in memory if the block's storage level includes
-              // "memory serialized", or if it should be cached as objects in memory
-              // but we only requested its serialized bytes:
+              /* We'll store the bytes in memory if the block's storage level includes
+               * "memory serialized", or if it should be cached as objects in memory
+               * but we only requested its serialized bytes. */
               val copyForMemory = ByteBuffer.allocate(bytes.limit)
               copyForMemory.put(bytes)
               memoryStore.putBytes(blockId, copyForMemory, level)
@@ -442,16 +441,17 @@ private[spark] class BlockManager(
             } else {
               val values = dataDeserialize(blockId, bytes)
               if (level.deserialized) {
-                // Cache the values before returning them:
+                // Cache the values before returning them
                 // TODO: Consider creating a putValues that also takes in a iterator?
                 val valuesBuffer = new ArrayBuffer[Any]
                 valuesBuffer ++= values
-                memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
-                  case Left(values2) =>
-                    return Some(values2)
-                  case _ =>
-                    throw new Exception("Memory store did not return back an iterator")
-                }
+                memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
+                  match {
+                    case Left(values2) =>
+                      return Some(values2)
+                    case _ =>
+                      throw new SparkException("Memory store did not return an iterator")
+                  }
               } else {
                 return Some(values)
               }
@@ -460,7 +460,7 @@ private[spark] class BlockManager(
         }
       }
     } else {
-      logDebug("Block " + blockId + " not registered locally")
+      logDebug(s"Block $blockId not registered locally")
     }
     None
   }
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
    * Get block from remote block managers.
    */
   def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
-    logDebug("Getting remote block " + blockId)
+    logDebug(s"Getting remote block $blockId")
     doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
   }
 
@@ -477,7 +477,7 @@ private[spark] class BlockManager(
    * Get block from remote block managers as serialized bytes.
    */
   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-    logDebug("Getting remote block " + blockId + " as bytes")
+    logDebug(s"Getting remote block $blockId as bytes")
     doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
   }
 
@@ -485,7 +485,7 @@ private[spark] class BlockManager(
     require(blockId != null, "BlockId is null")
     val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
-      logDebug("Getting remote block " + blockId + " from " + loc)
+      logDebug(s"Getting remote block $blockId from $loc")
       val data = BlockManagerWorker.syncGetBlock(
         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
       if (data != null) {
@@ -495,9 +495,9 @@ private[spark] class BlockManager(
           return Some(data)
         }
       }
-      logDebug("The value of block " + blockId + " is null")
+      logDebug(s"The value of block $blockId is null")
     }
-    logDebug("Block " + blockId + " not found")
+    logDebug(s"Block $blockId not found")
     None
   }
 
@@ -507,12 +507,12 @@ private[spark] class BlockManager(
   def get(blockId: BlockId): Option[Iterator[Any]] = {
     val local = getLocal(blockId)
     if (local.isDefined) {
-      logInfo("Found block %s locally".format(blockId))
+      logInfo(s"Found block $blockId locally")
       return local
     }
     val remote = getRemote(blockId)
     if (remote.isDefined) {
-      logInfo("Found block %s remotely".format(blockId))
+      logInfo(s"Found block $blockId remotely")
       return remote
     }
     None
@@ -533,7 +533,6 @@ private[spark] class BlockManager(
       } else {
         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
       }
-
     iter.initialize()
     iter
   }
@@ -543,6 +542,7 @@ private[spark] class BlockManager(
       values: Iterator[Any],
       level: StorageLevel,
       tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+    require(values != null, "Values is null")
     doPut(blockId, IteratorValues(values), level, tellMaster)
   }
 
@@ -562,8 +562,8 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Put a new block of values to the block manager. Return a list of blocks updated as a
-   * result of this put.
+   * Put a new block of values to the block manager.
+   * Return a list of blocks updated as a result of this put.
    */
   def put(
       blockId: BlockId,
@@ -575,8 +575,8 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Put a new block of serialized bytes to the block manager. Return a list of blocks updated
-   * as a result of this put.
+   * Put a new block of serialized bytes to the block manager.
+   * Return a list of blocks updated as a result of this put.
    */
   def putBytes(
       blockId: BlockId,
@@ -589,7 +589,7 @@ private[spark] class BlockManager(
 
   private def doPut(
       blockId: BlockId,
-      data: Values,
+      data: BlockValues,
       level: StorageLevel,
       tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
 
@@ -599,20 +599,18 @@ private[spark] class BlockManager(
     // Return value
     val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
+    /* Remember the block's storage level so that we can correctly drop it to disk if it needs
+     * to be dropped right after it got put into memory. Note, however, that other threads will
+     * not be able to get() this block until we call markReady on its BlockInfo. */
     val putBlockInfo = {
       val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
       if (oldBlockOpt.isDefined) {
         if (oldBlockOpt.get.waitForReady()) {
-          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+          logWarning(s"Block $blockId already exists on this machine; not re-adding it")
           return updatedBlocks
         }
-
         // TODO: So the block info exists - but previous attempt to load it (?) failed.
         // What do we do now ? Retry on it ?
         oldBlockOpt.get
@@ -623,10 +621,10 @@ private[spark] class BlockManager(
 
     val startTimeMs = System.currentTimeMillis
 
-    // If we're storing values and we need to replicate the data, we'll want access to the values,
-    // but because our put will read the whole iterator, there will be no values left. For the
-    // case where the put serializes data, we'll remember the bytes, above; but for the case where
-    // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
+    /* If we're storing values and we need to replicate the data, we'll want access to the values,
+     * but because our put will read the whole iterator, there will be no values left. For the
+     * case where the put serializes data, we'll remember the bytes, above; but for the case where
+     * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
     var valuesAfterPut: Iterator[Any] = null
 
     // Ditto for the bytes after the put
@@ -637,78 +635,62 @@ private[spark] class BlockManager(
 
     // If we're storing bytes, then initiate the replication before storing them locally.
     // This is faster as data is already serialized and ready to send.
-    val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
-      // Duplicate doesn't copy the bytes, just creates a wrapper
-      val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
-      Future {
-        replicate(blockId, bufferView, level)
-      }
-    } else {
-      null
+    val replicationFuture = data match {
+      case b: ByteBufferValues if level.replication > 1 =>
+        // Duplicate doesn't copy the bytes, but just creates a wrapper
+        val bufferView = b.buffer.duplicate()
+        Future { replicate(blockId, bufferView, level) }
+      case _ => null
     }
 
     putBlockInfo.synchronized {
-      logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
-        + " to get into synchronized block")
+      logTrace("Put for block %s took %s to get into synchronized block"
+        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
       var marked = false
       try {
-        if (level.useMemory) {
-          // Save it just to memory first, even if it also has useDisk set to true; we will
-          // drop it to disk later if the memory store can't hold it.
-          val res = data match {
-            case IteratorValues(iterator) =>
-              memoryStore.putValues(blockId, iterator, level, true)
-            case ArrayBufferValues(array) =>
-              memoryStore.putValues(blockId, array, level, true)
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              memoryStore.putBytes(blockId, bytes, level)
-          }
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case Left(newIterator) => valuesAfterPut = newIterator
-          }
-          // Keep track of which blocks are dropped from memory
-          res.droppedBlocks.foreach { block => updatedBlocks += block }
-        } else if (level.useOffHeap) {
-          // Save to Tachyon.
-          val res = data match {
-            case IteratorValues(iterator) =>
-              tachyonStore.putValues(blockId, iterator, level, false)
-            case ArrayBufferValues(array) =>
-              tachyonStore.putValues(blockId, array, level, false)
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              tachyonStore.putBytes(blockId, bytes, level)
-          }
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case _ =>
-          }
-        } else {
-          // Save directly to disk.
-          // Don't get back the bytes unless we replicate them.
-          val askForBytes = level.replication > 1
-
-          val res = data match {
-            case IteratorValues(iterator) =>
-              diskStore.putValues(blockId, iterator, level, askForBytes)
-            case ArrayBufferValues(array) =>
-              diskStore.putValues(blockId, array, level, askForBytes)
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              diskStore.putBytes(blockId, bytes, level)
-          }
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case _ =>
+        // returnValues - Whether to return the values put
+        // blockStore - The type of storage to put these values into
+        val (returnValues, blockStore: BlockStore) = {
+          if (level.useMemory) {
+            // Put it in memory first, even if it also has useDisk set to true;
+            // We will drop it to disk later if the memory store can't hold it.
+            (true, memoryStore)
+          } else if (level.useOffHeap) {
+            // Use tachyon for off-heap storage
+            (false, tachyonStore)
+          } else if (level.useDisk) {
+            // Don't get back the bytes from put unless we replicate them
+            (level.replication > 1, diskStore)
+          } else {
+            assert(level == StorageLevel.NONE)
+            throw new BlockException(
+              blockId, s"Attempted to put block $blockId without specifying storage level!")
           }
         }
 
+        // Actually put the values
+        val result = data match {
+          case IteratorValues(iterator) =>
+            blockStore.putValues(blockId, iterator, level, returnValues)
+          case ArrayBufferValues(array) =>
+            blockStore.putValues(blockId, array, level, returnValues)
+          case ByteBufferValues(bytes) =>
+            bytes.rewind()
+            blockStore.putBytes(blockId, bytes, level)
+        }
+        size = result.size
+        result.data match {
+          case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+          case Right (newBytes) => bytesAfterPut = newBytes
+          case _ =>
+        }
+
+        // Keep track of which blocks are dropped from memory
+        if (level.useMemory) {
+          result.droppedBlocks.foreach { updatedBlocks += _ }
+        }
+
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
         if (putBlockStatus.storageLevel != StorageLevel.NONE) {
           // Now that the block is in either the memory, tachyon, or disk store,
@@ -728,18 +710,21 @@ private[spark] class BlockManager(
           // could've inserted a new BlockInfo before we remove it.
           blockInfo.remove(blockId)
           putBlockInfo.markFailure()
-          logWarning("Putting block " + blockId + " failed")
+          logWarning(s"Putting block $blockId failed")
         }
       }
     }
-    logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
+    logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
     // Either we're storing bytes and we asynchronously started replication, or we're storing
     // values and need to serialize and replicate them now:
     if (level.replication > 1) {
       data match {
-        case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
-        case _ => {
+        case ByteBufferValues(bytes) =>
+          if (replicationFuture != null) {
+            Await.ready(replicationFuture, Duration.Inf)
+          }
+        case _ =>
           val remoteStartTime = System.currentTimeMillis
           // Serialize the block if not already done
           if (bytesAfterPut == null) {
@@ -750,20 +735,19 @@ private[spark] class BlockManager(
             bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
           }
           replicate(blockId, bytesAfterPut, level)
-          logDebug("Put block " + blockId + " remotely took " +
-            Utils.getUsedTimeMs(remoteStartTime))
-        }
+          logDebug("Put block %s remotely took %s"
+            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
       }
     }
 
     BlockManager.dispose(bytesAfterPut)
 
     if (level.replication > 1) {
-      logDebug("Put for block " + blockId + " with replication took " +
-        Utils.getUsedTimeMs(startTimeMs))
+      logDebug("Putting block %s with replication took %s"
+        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
-      logDebug("Put for block " + blockId + " without replication took " +
-        Utils.getUsedTimeMs(startTimeMs))
+      logDebug("Putting block %s without replication took %s"
+        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
     updatedBlocks
@@ -773,7 +757,7 @@ private[spark] class BlockManager(
    * Replicate block to another node.
    */
   @volatile var cachedPeers: Seq[BlockManagerId] = null
-  private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
+  private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
     val tLevel = StorageLevel(
       level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
     if (cachedPeers == null) {
@@ -782,15 +766,16 @@ private[spark] class BlockManager(
     for (peer: BlockManagerId <- cachedPeers) {
       val start = System.nanoTime
       data.rewind()
-      logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
-        + data.limit() + " Bytes. To node: " + peer)
-      if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
-        new ConnectionManagerId(peer.host, peer.port))) {
-        logError("Failed to call syncPutBlock to " + peer)
+      logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
+        s"To node: $peer")
+      val putBlock = PutBlock(blockId, data, tLevel)
+      val cmId = new ConnectionManagerId(peer.host, peer.port)
+      val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
+      if (!syncPutBlockSuccess) {
+        logError(s"Failed to call syncPutBlock to $peer")
       }
-      logDebug("Replicated BlockId " + blockId + " once used " +
-        (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
-        data.limit() + " bytes.")
+      logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
+        .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
     }
   }
 
@@ -822,17 +807,17 @@ private[spark] class BlockManager(
       blockId: BlockId,
       data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
 
-    logInfo("Dropping block " + blockId + " from memory")
+    logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId).orNull
 
     // If the block has not already been dropped
-    if (info != null)  {
+    if (info != null) {
       info.synchronized {
         // required ? As of now, this will be invoked only for blocks which are ready
         // But in case this changes in future, adding for consistency sake.
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
+          logWarning(s"Block $blockId was marked as failure. Nothing to drop")
           return None
         }
 
@@ -841,10 +826,10 @@ private[spark] class BlockManager(
 
         // Drop to disk, if storage level requires
         if (level.useDisk && !diskStore.contains(blockId)) {
-          logInfo("Writing block " + blockId + " to disk")
+          logInfo(s"Writing block $blockId to disk")
           data match {
             case Left(elements) =>
-              diskStore.putValues(blockId, elements, level, false)
+              diskStore.putValues(blockId, elements, level, returnValues = false)
             case Right(bytes) =>
               diskStore.putBytes(blockId, bytes, level)
           }
@@ -858,7 +843,7 @@ private[spark] class BlockManager(
         if (blockIsRemoved) {
           blockIsUpdated = true
         } else {
-          logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+          logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
         }
 
         val status = getCurrentBlockStatus(blockId, info)
@@ -883,7 +868,7 @@ private[spark] class BlockManager(
    */
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
-    logInfo("Removing RDD " + rddId)
+    logInfo(s"Removing RDD $rddId")
     val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
@@ -893,7 +878,7 @@ private[spark] class BlockManager(
    * Remove all blocks belonging to the given broadcast.
    */
   def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
-    logInfo("Removing broadcast " + broadcastId)
+    logInfo(s"Removing broadcast $broadcastId")
     val blocksToRemove = blockInfo.keys.collect {
       case bid @ BroadcastBlockId(`broadcastId`, _) => bid
     }
@@ -904,40 +889,42 @@ private[spark] class BlockManager(
   /**
    * Remove a block from both memory and disk.
    */
-  def removeBlock(blockId: BlockId, tellMaster: Boolean = true) {
-    logInfo("Removing block " + blockId)
+  def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
+    logInfo(s"Removing block $blockId")
     val info = blockInfo.get(blockId).orNull
-    if (info != null) info.synchronized {
-      // Removals are idempotent in disk store and memory store. At worst, we get a warning.
-      val removedFromMemory = memoryStore.remove(blockId)
-      val removedFromDisk = diskStore.remove(blockId)
-      val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
-      if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
-        logWarning("Block " + blockId + " could not be removed as it was not found in either " +
-          "the disk, memory, or tachyon store")
-      }
-      blockInfo.remove(blockId)
-      if (tellMaster && info.tellMaster) {
-        val status = getCurrentBlockStatus(blockId, info)
-        reportBlockStatus(blockId, info, status)
+    if (info != null) {
+      info.synchronized {
+        // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+        val removedFromMemory = memoryStore.remove(blockId)
+        val removedFromDisk = diskStore.remove(blockId)
+        val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
+        if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
+          logWarning(s"Block $blockId could not be removed as it was not found in either " +
+            "the disk, memory, or tachyon store")
+        }
+        blockInfo.remove(blockId)
+        if (tellMaster && info.tellMaster) {
+          val status = getCurrentBlockStatus(blockId, info)
+          reportBlockStatus(blockId, info, status)
+        }
       }
     } else {
       // The block has already been removed; do nothing.
-      logWarning("Asked to remove block " + blockId + ", which does not exist")
+      logWarning(s"Asked to remove block $blockId, which does not exist")
     }
   }
 
-  private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
-    logInfo("Dropping non broadcast blocks older than " + cleanupTime)
+  private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
+    logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
     dropOldBlocks(cleanupTime, !_.isBroadcast)
   }
 
-  private def dropOldBroadcastBlocks(cleanupTime: Long) {
-    logInfo("Dropping broadcast blocks older than " + cleanupTime)
+  private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
+    logInfo(s"Dropping broadcast blocks older than $cleanupTime")
     dropOldBlocks(cleanupTime, _.isBroadcast)
   }
 
-  private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) {
+  private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
     val iterator = blockInfo.getEntrySet.iterator
     while (iterator.hasNext) {
       val entry = iterator.next()
@@ -945,17 +932,11 @@ private[spark] class BlockManager(
       if (time < cleanupTime && shouldDrop(id)) {
         info.synchronized {
           val level = info.level
-          if (level.useMemory) {
-            memoryStore.remove(id)
-          }
-          if (level.useDisk) {
-            diskStore.remove(id)
-          }
-          if (level.useOffHeap) {
-            tachyonStore.remove(id)
-          }
+          if (level.useMemory) { memoryStore.remove(id) }
+          if (level.useDisk) { diskStore.remove(id) }
+          if (level.useOffHeap) { tachyonStore.remove(id) }
           iterator.remove()
-          logInfo("Dropped block " + id)
+          logInfo(s"Dropped block $id")
         }
         val status = getCurrentBlockStatus(id, info)
         reportBlockStatus(id, info, status)
@@ -963,12 +944,14 @@ private[spark] class BlockManager(
     }
   }
 
-  def shouldCompress(blockId: BlockId): Boolean = blockId match {
-    case ShuffleBlockId(_, _, _) => compressShuffle
-    case BroadcastBlockId(_, _) => compressBroadcast
-    case RDDBlockId(_, _) => compressRdds
-    case TempBlockId(_) => compressShuffleSpill
-    case _ => false
+  private def shouldCompress(blockId: BlockId): Boolean = {
+    blockId match {
+      case _: ShuffleBlockId => compressShuffle
+      case _: BroadcastBlockId => compressBroadcast
+      case _: RDDBlockId => compressRdds
+      case _: TempBlockId => compressShuffleSpill
+      case _ => false
+    }
   }
 
   /**
@@ -990,7 +973,7 @@ private[spark] class BlockManager(
       blockId: BlockId,
       outputStream: OutputStream,
       values: Iterator[Any],
-      serializer: Serializer = defaultSerializer) {
+      serializer: Serializer = defaultSerializer): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
     val ser = serializer.newInstance()
     ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
@@ -1016,16 +999,16 @@ private[spark] class BlockManager(
       serializer: Serializer = defaultSerializer): Iterator[Any] = {
     bytes.rewind()
 
-    def getIterator = {
+    def getIterator: Iterator[Any] = {
       val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
       serializer.newInstance().deserializeStream(stream).asIterator
     }
 
     if (blockId.isShuffle) {
-      // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
-      // at the beginning. The wrapping will cost some memory (compression instance
-      // initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
-      // wrapping lazily to save memory.
+      /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
+       * at the beginning. The wrapping will cost some memory (compression instance
+       * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
+       * wrapping lazily to save memory. */
       class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
         lazy val proxy = f
         override def hasNext: Boolean = proxy.hasNext
@@ -1037,7 +1020,7 @@ private[spark] class BlockManager(
     }
   }
 
-  def stop() {
+  def stop(): Unit = {
     if (heartBeatTask != null) {
       heartBeatTask.cancel()
     }
@@ -1059,9 +1042,9 @@ private[spark] class BlockManager(
 
 
 private[spark] object BlockManager extends Logging {
-  val ID_GENERATOR = new IdGenerator
+  private val ID_GENERATOR = new IdGenerator
 
-  def getMaxMemory(conf: SparkConf): Long = {
+  private def getMaxMemory(conf: SparkConf): Long = {
     val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
@@ -1078,9 +1061,9 @@ private[spark] object BlockManager extends Logging {
    * waiting for the GC to find it because that could lead to huge numbers of open files. There's
    * unfortunately no standard API to do this.
    */
-  def dispose(buffer: ByteBuffer) {
+  def dispose(buffer: ByteBuffer): Unit = {
     if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace("Unmapping " + buffer)
+      logTrace(s"Unmapping $buffer")
       if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
         buffer.asInstanceOf[DirectBuffer].cleaner().clean()
       }
@@ -1093,7 +1076,7 @@ private[spark] object BlockManager extends Logging {
       blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = {
 
     // blockManagerMaster != null is used in tests
-    assert (env != null || blockManagerMaster != null)
+    assert(env != null || blockManagerMaster != null)
     val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
       env.blockManager.getLocationBlockIds(blockIds)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 3a7243a..2ec46d4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -40,9 +40,9 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
-  // Create one local directory for each path mentioned in spark.local.dir; then, inside this
-  // directory, create multiple subdirectories that we will hash files into, in order to avoid
-  // having really large inodes at the top level.
+  /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
+   * directory, create multiple subdirectories that we will hash files into, in order to avoid
+   * having really large inodes at the top level. */
   private val localDirs: Array[File] = createLocalDirs()
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
   private var shuffleSender : ShuffleSender = null
@@ -114,7 +114,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   }
 
   private def createLocalDirs(): Array[File] = {
-    logDebug("Creating local directories at root dirs '" + rootDirs + "'")
+    logDebug(s"Creating local directories at root dirs '$rootDirs'")
     val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
     rootDirs.split(",").map { rootDir =>
       var foundLocalDir = false
@@ -126,21 +126,20 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
         tries += 1
         try {
           localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
-          localDir = new File(rootDir, "spark-local-" + localDirId)
+          localDir = new File(rootDir, s"spark-local-$localDirId")
           if (!localDir.exists) {
             foundLocalDir = localDir.mkdirs()
           }
         } catch {
           case e: Exception =>
-            logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
+            logWarning(s"Attempt $tries to create local dir $localDir failed", e)
         }
       }
       if (!foundLocalDir) {
-        logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
-          " attempts to create local dir in " + rootDir)
+        logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir")
         System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
       }
-      logInfo("Created local directory at " + localDir)
+      logInfo(s"Created local directory at $localDir")
       localDir
     }
   }
@@ -163,7 +162,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
           if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
         } catch {
           case e: Exception =>
-            logError("Exception while deleting local spark dir: " + localDir, e)
+            logError(s"Exception while deleting local spark dir: $localDir", e)
         }
       }
     }
@@ -175,7 +174,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
 
   private[storage] def startShuffleBlockSender(port: Int): Int = {
     shuffleSender = new ShuffleSender(port, this)
-    logInfo("Created ShuffleSender binding to port : " + shuffleSender.port)
+    logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}")
     shuffleSender.port
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 0ab9fad..ebff0cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -39,41 +39,39 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
     diskManager.getBlockLocation(blockId).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
     // So that we do not modify the input offsets !
     // duplicate does not copy buffer, so inexpensive
     val bytes = _bytes.duplicate()
-    logDebug("Attempting to put block " + blockId)
+    logDebug(s"Attempting to put block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
-    val channel = new FileOutputStream(file).getChannel()
+    val channel = new FileOutputStream(file).getChannel
     while (bytes.remaining > 0) {
       channel.write(bytes)
     }
     channel.close()
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
-    return PutResult(bytes.limit(), Right(bytes.duplicate()))
+      file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+    PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
   override def putValues(
       blockId: BlockId,
       values: ArrayBuffer[Any],
       level: StorageLevel,
-      returnValues: Boolean)
-  : PutResult = {
-    return putValues(blockId, values.toIterator, level, returnValues)
+      returnValues: Boolean): PutResult = {
+    putValues(blockId, values.toIterator, level, returnValues)
   }
 
   override def putValues(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      returnValues: Boolean)
-    : PutResult = {
+      returnValues: Boolean): PutResult = {
 
-    logDebug("Attempting to write values for block " + blockId)
+    logDebug(s"Attempting to write values for block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
     val outputStream = new FileOutputStream(file)
@@ -95,7 +93,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
     val segment = diskManager.getBlockLocation(blockId)
-    val channel = new RandomAccessFile(segment.file, "r").getChannel()
+    val channel = new RandomAccessFile(segment.file, "r").getChannel
 
     try {
       // For small files, directly read rather than memory map
@@ -131,7 +129,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
       file.delete()
     } else {
       if (fileSegment.length < file.length()) {
-        logWarning("Could not delete block associated with only a part of a file: " + blockId)
+        logWarning(s"Could not delete block associated with only a part of a file: $blockId")
       }
       false
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 488f1ea..084a566 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -24,6 +24,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.util.{SizeEstimator, Utils}
 
+private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
+
 /**
  * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
  * serialized ByteBuffers.
@@ -31,15 +33,13 @@ import org.apache.spark.util.{SizeEstimator, Utils}
 private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   extends BlockStore(blockManager) {
 
-  case class Entry(value: Any, size: Long, deserialized: Boolean)
-
-  private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)
+  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
   @volatile private var currentMemory = 0L
   // Object used to ensure that only one thread is putting blocks and if necessary, dropping
   // blocks from the memory store.
   private val putLock = new Object()
 
-  logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
+  logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
 
   def freeMemory: Long = maxMemory - currentMemory
 
@@ -101,7 +101,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     } else if (entry.deserialized) {
       Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
     } else {
-      Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn't actually copy the data
+      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
     }
   }
 
@@ -124,8 +124,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val entry = entries.remove(blockId)
       if (entry != null) {
         currentMemory -= entry.size
-        logInfo("Block %s of size %d dropped from memory (free %d)".format(
-          blockId, entry.size, freeMemory))
+        logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
         true
       } else {
         false
@@ -181,18 +180,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       droppedBlocks ++= freeSpaceResult.droppedBlocks
 
       if (enoughFreeSpace) {
-        val entry = new Entry(value, size, deserialized)
+        val entry = new MemoryEntry(value, size, deserialized)
         entries.synchronized {
           entries.put(blockId, entry)
           currentMemory += size
         }
-        if (deserialized) {
-          logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
-            blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
-        } else {
-          logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
-            blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
-        }
+        val valuesOrBytes = if (deserialized) "values" else "bytes"
+        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
         putSuccess = true
       } else {
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
@@ -221,13 +216,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * Return whether there is enough free space, along with the blocks dropped in the process.
    */
   private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
-    logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
-      space, currentMemory, maxMemory))
+    logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
 
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
     if (space > maxMemory) {
-      logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
+      logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
       return ResultWithDroppedBlocks(success = false, droppedBlocks)
     }
 
@@ -252,7 +246,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       }
 
       if (maxMemory - (currentMemory - selectedMemory) >= space) {
-        logInfo(selectedBlocks.size + " blocks selected for dropping")
+        logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
           val entry = entries.synchronized { entries.get(blockId) }
           // This should never be null as only one thread should be dropping

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 2d8ff11..1e35aba 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -34,11 +34,11 @@ import org.apache.spark.annotation.DeveloperApi
  */
 @DeveloperApi
 class StorageLevel private(
-    private var useDisk_ : Boolean,
-    private var useMemory_ : Boolean,
-    private var useOffHeap_ : Boolean,
-    private var deserialized_ : Boolean,
-    private var replication_ : Int = 1)
+    private var _useDisk: Boolean,
+    private var _useMemory: Boolean,
+    private var _useOffHeap: Boolean,
+    private var _deserialized: Boolean,
+    private var _replication: Int = 1)
   extends Externalizable {
 
   // TODO: Also add fields for caching priority, dataset ID, and flushing.
@@ -48,13 +48,13 @@ class StorageLevel private(
 
   def this() = this(false, true, false, false)  // For deserialization
 
-  def useDisk = useDisk_
-  def useMemory = useMemory_
-  def useOffHeap = useOffHeap_
-  def deserialized = deserialized_
-  def replication = replication_
+  def useDisk = _useDisk
+  def useMemory = _useMemory
+  def useOffHeap = _useOffHeap
+  def deserialized = _deserialized
+  def replication = _replication
 
-  assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+  assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
 
   if (useOffHeap) {
     require(!useDisk, "Off-heap storage level does not support using disk")
@@ -63,8 +63,9 @@ class StorageLevel private(
     require(replication == 1, "Off-heap storage level does not support multiple replication")
   }
 
-  override def clone(): StorageLevel = new StorageLevel(
-    this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication)
+  override def clone(): StorageLevel = {
+    new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
+  }
 
   override def equals(other: Any): Boolean = other match {
     case s: StorageLevel =>
@@ -77,20 +78,20 @@ class StorageLevel private(
       false
   }
 
-  def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0))
+  def isValid = (useMemory || useDisk || useOffHeap) && (replication > 0)
 
   def toInt: Int = {
     var ret = 0
-    if (useDisk_) {
+    if (_useDisk) {
       ret |= 8
     }
-    if (useMemory_) {
+    if (_useMemory) {
       ret |= 4
     }
-    if (useOffHeap_) {
+    if (_useOffHeap) {
       ret |= 2
     }
-    if (deserialized_) {
+    if (_deserialized) {
       ret |= 1
     }
     ret
@@ -98,32 +99,34 @@ class StorageLevel private(
 
   override def writeExternal(out: ObjectOutput) {
     out.writeByte(toInt)
-    out.writeByte(replication_)
+    out.writeByte(_replication)
   }
 
   override def readExternal(in: ObjectInput) {
     val flags = in.readByte()
-    useDisk_ = (flags & 8) != 0
-    useMemory_ = (flags & 4) != 0
-    useOffHeap_ = (flags & 2) != 0
-    deserialized_ = (flags & 1) != 0
-    replication_ = in.readByte()
+    _useDisk = (flags & 8) != 0
+    _useMemory = (flags & 4) != 0
+    _useOffHeap = (flags & 2) != 0
+    _deserialized = (flags & 1) != 0
+    _replication = in.readByte()
   }
 
   @throws(classOf[IOException])
   private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
 
-  override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format(
-    useDisk, useMemory, useOffHeap, deserialized, replication)
+  override def toString: String = {
+    s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
+  }
 
   override def hashCode(): Int = toInt * 41 + replication
-  def description : String = {
+
+  def description: String = {
     var result = ""
     result += (if (useDisk) "Disk " else "")
     result += (if (useMemory) "Memory " else "")
     result += (if (useOffHeap) "Tachyon " else "")
     result += (if (deserialized) "Deserialized " else "Serialized ")
-    result += "%sx Replicated".format(replication)
+    result += s"${replication}x Replicated"
     result
   }
 }
@@ -165,7 +168,7 @@ object StorageLevel {
     case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
     case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
     case "OFF_HEAP" => OFF_HEAP
-    case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
+    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
   }
 
   /**
@@ -173,26 +176,37 @@ object StorageLevel {
    * Create a new StorageLevel object without setting useOffHeap.
    */
   @DeveloperApi
-  def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
-    deserialized: Boolean, replication: Int) = getCachedStorageLevel(
+  def apply(
+      useDisk: Boolean,
+      useMemory: Boolean,
+      useOffHeap: Boolean,
+      deserialized: Boolean,
+      replication: Int) = {
+    getCachedStorageLevel(
       new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
+  }
 
   /**
    * :: DeveloperApi ::
    * Create a new StorageLevel object.
    */
   @DeveloperApi
-  def apply(useDisk: Boolean, useMemory: Boolean,
-    deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel(
-      new StorageLevel(useDisk, useMemory, false, deserialized, replication))
+  def apply(
+      useDisk: Boolean,
+      useMemory: Boolean,
+      deserialized: Boolean,
+      replication: Int = 1) = {
+    getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
+  }
 
   /**
    * :: DeveloperApi ::
    * Create a new StorageLevel object from its integer representation.
    */
   @DeveloperApi
-  def apply(flags: Int, replication: Int): StorageLevel =
+  def apply(flags: Int, replication: Int): StorageLevel = {
     getCachedStorageLevel(new StorageLevel(flags, replication))
+  }
 
   /**
    * :: DeveloperApi ::
@@ -205,8 +219,8 @@ object StorageLevel {
     getCachedStorageLevel(obj)
   }
 
-  private[spark]
-  val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+  private[spark] val storageLevelCache =
+    new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
 
   private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
     storageLevelCache.putIfAbsent(level, level)

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index c37e76f..d8ff4ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 
-import tachyon.client.{WriteType, ReadType}
+import tachyon.client.{ReadType, WriteType}
 
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
-import org.apache.spark.serializer.Serializer
-
-
-private class Entry(val size: Long)
-
 
 /**
  * Stores BlockManager blocks on Tachyon.
@@ -46,8 +41,8 @@ private class TachyonStore(
     tachyonManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult =  {
-    putToTachyonStore(blockId, bytes, true)
+  override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+    putIntoTachyonStore(blockId, bytes, returnValues = true)
   }
 
   override def putValues(
@@ -55,7 +50,7 @@ private class TachyonStore(
       values: ArrayBuffer[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    return putValues(blockId, values.toIterator, level, returnValues)
+    putValues(blockId, values.toIterator, level, returnValues)
   }
 
   override def putValues(
@@ -63,12 +58,12 @@ private class TachyonStore(
       values: Iterator[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    logDebug("Attempting to write values for block " + blockId)
-    val _bytes = blockManager.dataSerialize(blockId, values)
-    putToTachyonStore(blockId, _bytes, returnValues)
+    logDebug(s"Attempting to write values for block $blockId")
+    val bytes = blockManager.dataSerialize(blockId, values)
+    putIntoTachyonStore(blockId, bytes, returnValues)
   }
 
-  private def putToTachyonStore(
+  private def putIntoTachyonStore(
       blockId: BlockId,
       bytes: ByteBuffer,
       returnValues: Boolean): PutResult = {
@@ -76,7 +71,7 @@ private class TachyonStore(
     // duplicate does not copy buffer, so inexpensive
     val byteBuffer = bytes.duplicate()
     byteBuffer.rewind()
-    logDebug("Attempting to put block " + blockId + " into Tachyon")
+    logDebug(s"Attempting to put block $blockId into Tachyon")
     val startTime = System.currentTimeMillis
     val file = tachyonManager.getFile(blockId)
     val os = file.getOutStream(WriteType.TRY_CACHE)
@@ -84,7 +79,7 @@ private class TachyonStore(
     os.close()
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
-      blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime)))
+      blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
 
     if (returnValues) {
       PutResult(bytes.limit(), Right(bytes.duplicate()))
@@ -106,10 +101,9 @@ private class TachyonStore(
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
   }
 
-
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
     val file = tachyonManager.getFile(blockId)
-    if (file == null || file.getLocationHosts().size == 0) {
+    if (file == null || file.getLocationHosts.size == 0) {
       return None
     }
     val is = file.getInStream(ReadType.CACHE)
@@ -121,16 +115,15 @@ private class TachyonStore(
         val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
         buffer = ByteBuffer.wrap(bs)
         if (fetchSize != size) {
-          logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size +
-            " is not equal to fetched size " + fetchSize)
+          logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " +
+            s"is not equal to fetched size $fetchSize")
           return None
         }
       }
     } catch {
-        case ioe: IOException => {
-          logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe)
-          return None
-        }
+      case ioe: IOException =>
+        logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+        return None
     }
     Some(buffer)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/44daec5a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ee62979..042fdfc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,8 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
             ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.MemoryStore.Entry"),
+            ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
                 + "createZero$1")
           ) ++
@@ -67,7 +69,10 @@ object MimaExcludes {
           ) ++
           MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
           MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
-          MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
+          MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
+          MimaBuild.excludeSparkClass("storage.Values") ++
+          MimaBuild.excludeSparkClass("storage.Entry") ++
+          MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
         case v if v.startsWith("1.0") =>
           Seq(
             MimaBuild.excludeSparkPackage("api.java"),