You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:59 UTC

[15/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
deleted file mode 100644
index 2a6ec2a..0000000
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ /dev/null
@@ -1,1046 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{InputStream, OutputStream}
-import java.nio.{ByteBuffer, MappedByteBuffer}
-
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
-
-import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import spark.{Logging, SparkEnv, SparkException, Utils}
-import spark.io.CompressionCodec
-import spark.network._
-import spark.serializer.Serializer
-import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
-
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManager(
-    executorId: String,
-    actorSystem: ActorSystem,
-    val master: BlockManagerMaster,
-    val defaultSerializer: Serializer,
-    maxMemory: Long)
-  extends Logging {
-
-  private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
-    @volatile var pending: Boolean = true
-    @volatile var size: Long = -1L
-    @volatile var initThread: Thread = null
-    @volatile var failed = false
-
-    setInitThread()
-
-    private def setInitThread() {
-      // Set current thread as init thread - waitForReady will not block this thread
-      // (in case there is non trivial initialization which ends up calling waitForReady as part of
-      // initialization itself)
-      this.initThread = Thread.currentThread()
-    }
-
-    /**
-     * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
-     * Return true if the block is available, false otherwise.
-     */
-    def waitForReady(): Boolean = {
-      if (initThread != Thread.currentThread() && pending) {
-        synchronized {
-          while (pending) this.wait()
-        }
-      }
-      !failed
-    }
-
-    /** Mark this BlockInfo as ready (i.e. block is finished writing) */
-    def markReady(sizeInBytes: Long) {
-      assert (pending)
-      size = sizeInBytes
-      initThread = null
-      failed = false
-      initThread = null
-      pending = false
-      synchronized {
-        this.notifyAll()
-      }
-    }
-
-    /** Mark this BlockInfo as ready but failed */
-    def markFailure() {
-      assert (pending)
-      size = 0
-      initThread = null
-      failed = true
-      initThread = null
-      pending = false
-      synchronized {
-        this.notifyAll()
-      }
-    }
-  }
-
-  val shuffleBlockManager = new ShuffleBlockManager(this)
-
-  private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
-
-  private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
-  private[storage] val diskStore: DiskStore =
-    new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
-
-  // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
-  private val nettyPort: Int = {
-    val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
-    val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
-    if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
-  }
-
-  val connectionManager = new ConnectionManager(0)
-  implicit val futureExecContext = connectionManager.futureExecContext
-
-  val blockManagerId = BlockManagerId(
-    executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
-
-  // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
-  // for receiving shuffle outputs)
-  val maxBytesInFlight =
-    System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
-
-  // Whether to compress broadcast variables that are stored
-  val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
-  // Whether to compress shuffle output that are stored
-  val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
-  // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
-
-  val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
-
-  val hostPort = Utils.localHostPort()
-
-  val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
-    name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
-
-  // Pending reregistration action being executed asynchronously or null if none
-  // is pending. Accesses should synchronize on asyncReregisterLock.
-  var asyncReregisterTask: Future[Unit] = null
-  val asyncReregisterLock = new Object
-
-  private def heartBeat() {
-    if (!master.sendHeartBeat(blockManagerId)) {
-      reregister()
-    }
-  }
-
-  var heartBeatTask: Cancellable = null
-
-  val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
-  initialize()
-
-  // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
-  // the initialization of the compression codec until it is first used. The reason is that a Spark
-  // program could be using a user-defined codec in a third party jar, which is loaded in
-  // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
-  // loaded yet.
-  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
-
-  /**
-   * Construct a BlockManager with a memory limit set based on system properties.
-   */
-  def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
-           serializer: Serializer) = {
-    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
-  }
-
-  /**
-   * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
-   * BlockManagerWorker actor.
-   */
-  private def initialize() {
-    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
-    BlockManagerWorker.startBlockManagerWorker(this)
-    if (!BlockManager.getDisableHeartBeatsForTesting) {
-      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
-        heartBeat()
-      }
-    }
-  }
-
-  /**
-   * Report all blocks to the BlockManager again. This may be necessary if we are dropped
-   * by the BlockManager and come back or if we become capable of recovering blocks on disk after
-   * an executor crash.
-   *
-   * This function deliberately fails silently if the master returns false (indicating that
-   * the slave needs to reregister). The error condition will be detected again by the next
-   * heart beat attempt or new block registration and another try to reregister all blocks
-   * will be made then.
-   */
-  private def reportAllBlocks() {
-    logInfo("Reporting " + blockInfo.size + " blocks to the master.")
-    for ((blockId, info) <- blockInfo) {
-      if (!tryToReportBlockStatus(blockId, info)) {
-        logError("Failed to report " + blockId + " to master; giving up.")
-        return
-      }
-    }
-  }
-
-  /**
-   * Reregister with the master and report all blocks to it. This will be called by the heart beat
-   * thread if our heartbeat to the block amnager indicates that we were not registered.
-   *
-   * Note that this method must be called without any BlockInfo locks held.
-   */
-  def reregister() {
-    // TODO: We might need to rate limit reregistering.
-    logInfo("BlockManager reregistering with master")
-    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
-    reportAllBlocks()
-  }
-
-  /**
-   * Reregister with the master sometime soon.
-   */
-  def asyncReregister() {
-    asyncReregisterLock.synchronized {
-      if (asyncReregisterTask == null) {
-        asyncReregisterTask = Future[Unit] {
-          reregister()
-          asyncReregisterLock.synchronized {
-            asyncReregisterTask = null
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
-   */
-  def waitForAsyncReregister() {
-    val task = asyncReregisterTask
-    if (task != null) {
-      Await.ready(task, Duration.Inf)
-    }
-  }
-
-  /**
-   * Get storage level of local block. If no info exists for the block, then returns null.
-   */
-  def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
-
-  /**
-   * Tell the master about the current storage status of a block. This will send a block update
-   * message reflecting the current status, *not* the desired storage level in its block info.
-   * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
-   *
-   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
-   * This ensures that update in master will compensate for the increase in memory on slave.
-   */
-  def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {
-    val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
-    if (needReregister) {
-      logInfo("Got told to reregister updating block " + blockId)
-      // Reregistering will report our new block for free.
-      asyncReregister()
-    }
-    logDebug("Told master about block " + blockId)
-  }
-
-  /**
-   * Actually send a UpdateBlockInfo message. Returns the mater's response,
-   * which will be true if the block was successfully recorded and false if
-   * the slave needs to re-register.
-   */
-  private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
-    val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
-      info.level match {
-        case null =>
-          (StorageLevel.NONE, 0L, 0L, false)
-        case level =>
-          val inMem = level.useMemory && memoryStore.contains(blockId)
-          val onDisk = level.useDisk && diskStore.contains(blockId)
-          val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
-          val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
-          val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
-          (storageLevel, memSize, diskSize, info.tellMaster)
-      }
-    }
-
-    if (tellMaster) {
-      master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
-    } else {
-      true
-    }
-  }
-
-  /**
-   * Get locations of an array of blocks.
-   */
-  def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = {
-    val startTimeMs = System.currentTimeMillis
-    val locations = master.getLocations(blockIds).toArray
-    logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
-    locations
-  }
-
-  /**
-   * A short-circuited method to get blocks directly from disk. This is used for getting
-   * shuffle blocks. It is safe to do so without a lock on block info since disk store
-   * never deletes (recent) items.
-   */
-  def getLocalFromDisk(blockId: String, serializer: Serializer): Option[Iterator[Any]] = {
-    diskStore.getValues(blockId, serializer).orElse(
-      sys.error("Block " + blockId + " not found on disk, though it should be"))
-  }
-
-  /**
-   * Get block from local block manager.
-   */
-  def getLocal(blockId: String): Option[Iterator[Any]] = {
-    logDebug("Getting local block " + blockId)
-    val info = blockInfo.get(blockId).orNull
-    if (info != null) {
-      info.synchronized {
-
-        // In the another thread is writing the block, wait for it to become ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure.")
-          return None
-        }
-
-        val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getValues(blockId) match {
-            case Some(iterator) =>
-              return Some(iterator)
-            case None =>
-              logDebug("Block " + blockId + " not found in memory")
-          }
-        }
-
-        // Look for block on disk, potentially loading it back into memory if required
-        if (level.useDisk) {
-          logDebug("Getting block " + blockId + " from disk")
-          if (level.useMemory && level.deserialized) {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                // Put the block back in memory before returning it
-                // TODO: Consider creating a putValues that also takes in a iterator ?
-                val elements = new ArrayBuffer[Any]
-                elements ++= iterator
-                memoryStore.putValues(blockId, elements, level, true).data match {
-                  case Left(iterator2) =>
-                    return Some(iterator2)
-                  case _ =>
-                    throw new Exception("Memory store did not return back an iterator")
-                }
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          } else if (level.useMemory && !level.deserialized) {
-            // Read it as a byte buffer into memory first, then return it
-            diskStore.getBytes(blockId) match {
-              case Some(bytes) =>
-                // Put a copy of the block back in memory before returning it. Note that we can't
-                // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
-                // The use of rewind assumes this.
-                assert (0 == bytes.position())
-                val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                copyForMemory.put(bytes)
-                memoryStore.putBytes(blockId, copyForMemory, level)
-                bytes.rewind()
-                return Some(dataDeserialize(blockId, bytes))
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          } else {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                return Some(iterator)
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          }
-        }
-      }
-    } else {
-      logDebug("Block " + blockId + " not registered locally")
-    }
-    return None
-  }
-
-  /**
-   * Get block from the local block manager as serialized bytes.
-   */
-  def getLocalBytes(blockId: String): Option[ByteBuffer] = {
-    // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
-    logDebug("Getting local block " + blockId + " as bytes")
-
-    // As an optimization for map output fetches, if the block is for a shuffle, return it
-    // without acquiring a lock; the disk store never deletes (recent) items so this should work
-    if (ShuffleBlockManager.isShuffle(blockId)) {
-      return diskStore.getBytes(blockId) match {
-        case Some(bytes) =>
-          Some(bytes)
-        case None =>
-          throw new Exception("Block " + blockId + " not found on disk, though it should be")
-      }
-    }
-
-    val info = blockInfo.get(blockId).orNull
-    if (info != null) {
-      info.synchronized {
-
-        // In the another thread is writing the block, wait for it to become ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure.")
-          return None
-        }
-
-        val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              return Some(bytes)
-            case None =>
-              logDebug("Block " + blockId + " not found in memory")
-          }
-        }
-
-        // Look for block on disk
-        if (level.useDisk) {
-          // Read it as a byte buffer into memory first, then return it
-          diskStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              assert (0 == bytes.position())
-              if (level.useMemory) {
-                if (level.deserialized) {
-                  memoryStore.putBytes(blockId, bytes, level)
-                } else {
-                  // The memory store will hang onto the ByteBuffer, so give it a copy instead of
-                  // the memory-mapped file buffer we got from the disk store
-                  val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                  copyForMemory.put(bytes)
-                  memoryStore.putBytes(blockId, copyForMemory, level)
-                }
-              }
-              bytes.rewind()
-              return Some(bytes)
-            case None =>
-              throw new Exception("Block " + blockId + " not found on disk, though it should be")
-          }
-        }
-      }
-    } else {
-      logDebug("Block " + blockId + " not registered locally")
-    }
-    return None
-  }
-
-  /**
-   * Get block from remote block managers.
-   */
-  def getRemote(blockId: String): Option[Iterator[Any]] = {
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    logDebug("Getting remote block " + blockId)
-    // Get locations of block
-    val locations = master.getLocations(blockId)
-
-    // Get block from remote locations
-    for (loc <- locations) {
-      logDebug("Getting remote block " + blockId + " from " + loc)
-      val data = BlockManagerWorker.syncGetBlock(
-          GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
-      if (data != null) {
-        return Some(dataDeserialize(blockId, data))
-      }
-      logDebug("The value of block " + blockId + " is null")
-    }
-    logDebug("Block " + blockId + " not found")
-    return None
-  }
-
-  /**
-   * Get a block from the block manager (either local or remote).
-   */
-  def get(blockId: String): Option[Iterator[Any]] = {
-    getLocal(blockId).orElse(getRemote(blockId))
-  }
-
-  /**
-   * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
-   * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
-   * fashion as they're received. Expects a size in bytes to be provided for each block fetched,
-   * so that we can control the maxMegabytesInFlight for the fetch.
-   */
-  def getMultiple(
-    blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])], serializer: Serializer)
-      : BlockFetcherIterator = {
-
-    val iter =
-      if (System.getProperty("spark.shuffle.use.netty", "false").toBoolean) {
-        new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
-      } else {
-        new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
-      }
-
-    iter.initialize()
-    iter
-  }
-
-  def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
-    : Long = {
-    val elements = new ArrayBuffer[Any]
-    elements ++= values
-    put(blockId, elements, level, tellMaster)
-  }
-
-  /**
-   * A short circuited method to get a block writer that can write data directly to disk.
-   * This is currently used for writing shuffle files out. Callers should handle error
-   * cases.
-   */
-  def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
-    : BlockObjectWriter = {
-    val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
-    writer.registerCloseEventHandler(() => {
-      val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
-      blockInfo.put(blockId, myInfo)
-      myInfo.markReady(writer.size())
-    })
-    writer
-  }
-
-  /**
-   * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
-   */
-  def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
-    tellMaster: Boolean = true) : Long = {
-
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (values == null) {
-      throw new IllegalArgumentException("Values is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = {
-      val tinfo = new BlockInfo(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-          return oldBlockOpt.get.size
-        }
-
-        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
-        oldBlockOpt.get
-      } else {
-        tinfo
-      }
-    }
-
-    val startTimeMs = System.currentTimeMillis
-
-    // If we need to replicate the data, we'll want access to the values, but because our
-    // put will read the whole iterator, there will be no values left. For the case where
-    // the put serializes data, we'll remember the bytes, above; but for the case where it
-    // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
-    var valuesAfterPut: Iterator[Any] = null
-
-    // Ditto for the bytes after the put
-    var bytesAfterPut: ByteBuffer = null
-
-    // Size of the block in bytes (to return to caller)
-    var size = 0L
-
-    myInfo.synchronized {
-      logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
-        + " to get into synchronized block")
-
-      var marked = false
-      try {
-        if (level.useMemory) {
-          // Save it just to memory first, even if it also has useDisk set to true; we will later
-          // drop it to disk if the memory store can't hold it.
-          val res = memoryStore.putValues(blockId, values, level, true)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case Left(newIterator) => valuesAfterPut = newIterator
-          }
-        } else {
-          // Save directly to disk.
-          // Don't get back the bytes unless we replicate them.
-          val askForBytes = level.replication > 1
-          val res = diskStore.putValues(blockId, values, level, askForBytes)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case _ =>
-          }
-        }
-
-        // Now that the block is in either the memory or disk store, let other threads read it,
-        // and tell the master about it.
-        marked = true
-        myInfo.markReady(size)
-        if (tellMaster) {
-          reportBlockStatus(blockId, myInfo)
-        }
-      } finally {
-        // If we failed at putting the block to memory/disk, notify other possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (! marked) {
-          // Note that the remove must happen before markFailure otherwise another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          myInfo.markFailure()
-          logWarning("Putting block " + blockId + " failed")
-        }
-      }
-    }
-    logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
-
-    // Replicate block if required
-    if (level.replication > 1) {
-      val remoteStartTime = System.currentTimeMillis
-      // Serialize the block if not already done
-      if (bytesAfterPut == null) {
-        if (valuesAfterPut == null) {
-          throw new SparkException(
-            "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
-        }
-        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
-      }
-      replicate(blockId, bytesAfterPut, level)
-      logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
-    }
-    BlockManager.dispose(bytesAfterPut)
-
-    return size
-  }
-
-
-  /**
-   * Put a new block of serialized bytes to the block manager.
-   */
-  def putBytes(
-    blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
-
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (bytes == null) {
-      throw new IllegalArgumentException("Bytes is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = {
-      val tinfo = new BlockInfo(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-          return
-        }
-
-        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
-        oldBlockOpt.get
-      } else {
-        tinfo
-      }
-    }
-
-    val startTimeMs = System.currentTimeMillis
-
-    // Initiate the replication before storing it locally. This is faster as
-    // data is already serialized and ready for sending
-    val replicationFuture = if (level.replication > 1) {
-      val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
-      Future {
-        replicate(blockId, bufferView, level)
-      }
-    } else {
-      null
-    }
-
-    myInfo.synchronized {
-      logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
-        + " to get into synchronized block")
-
-      var marked = false
-      try {
-        if (level.useMemory) {
-          // Store it only in memory at first, even if useDisk is also set to true
-          bytes.rewind()
-          memoryStore.putBytes(blockId, bytes, level)
-        } else {
-          bytes.rewind()
-          diskStore.putBytes(blockId, bytes, level)
-        }
-
-        // assert (0 == bytes.position(), "" + bytes)
-
-        // Now that the block is in either the memory or disk store, let other threads read it,
-        // and tell the master about it.
-        marked = true
-        myInfo.markReady(bytes.limit)
-        if (tellMaster) {
-          reportBlockStatus(blockId, myInfo)
-        }
-      } finally {
-        // If we failed at putting the block to memory/disk, notify other possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (! marked) {
-          // Note that the remove must happen before markFailure otherwise another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          myInfo.markFailure()
-          logWarning("Putting block " + blockId + " failed")
-        }
-      }
-    }
-
-    // If replication had started, then wait for it to finish
-    if (level.replication > 1) {
-      Await.ready(replicationFuture, Duration.Inf)
-    }
-
-    if (level.replication > 1) {
-      logDebug("PutBytes for block " + blockId + " with replication took " +
-        Utils.getUsedTimeMs(startTimeMs))
-    } else {
-      logDebug("PutBytes for block " + blockId + " without replication took " +
-        Utils.getUsedTimeMs(startTimeMs))
-    }
-  }
-
-  /**
-   * Replicate block to another node.
-   */
-  var cachedPeers: Seq[BlockManagerId] = null
-  private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
-    val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
-    if (cachedPeers == null) {
-      cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
-    }
-    for (peer: BlockManagerId <- cachedPeers) {
-      val start = System.nanoTime
-      data.rewind()
-      logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
-        + data.limit() + " Bytes. To node: " + peer)
-      if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
-        new ConnectionManagerId(peer.host, peer.port))) {
-        logError("Failed to call syncPutBlock to " + peer)
-      }
-      logDebug("Replicated BlockId " + blockId + " once used " +
-        (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
-        data.limit() + " bytes.")
-    }
-  }
-
-  /**
-   * Read a block consisting of a single object.
-   */
-  def getSingle(blockId: String): Option[Any] = {
-    get(blockId).map(_.next())
-  }
-
-  /**
-   * Write a block consisting of a single object.
-   */
-  def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
-    put(blockId, Iterator(value), level, tellMaster)
-  }
-
-  /**
-   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
-   * store reaches its limit and needs to free up space.
-   */
-  def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
-    logInfo("Dropping block " + blockId + " from memory")
-    val info = blockInfo.get(blockId).orNull
-    if (info != null)  {
-      info.synchronized {
-        // required ? As of now, this will be invoked only for blocks which are ready
-        // But in case this changes in future, adding for consistency sake.
-        if (! info.waitForReady() ) {
-          // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
-          return
-        }
-
-        val level = info.level
-        if (level.useDisk && !diskStore.contains(blockId)) {
-          logInfo("Writing block " + blockId + " to disk")
-          data match {
-            case Left(elements) =>
-              diskStore.putValues(blockId, elements, level, false)
-            case Right(bytes) =>
-              diskStore.putBytes(blockId, bytes, level)
-          }
-        }
-        val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
-        val blockWasRemoved = memoryStore.remove(blockId)
-        if (!blockWasRemoved) {
-          logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
-        }
-        if (info.tellMaster) {
-          reportBlockStatus(blockId, info, droppedMemorySize)
-        }
-        if (!level.useDisk) {
-          // The block is completely gone from this node; forget it so we can put() it again later.
-          blockInfo.remove(blockId)
-        }
-      }
-    } else {
-      // The block has already been dropped
-    }
-  }
-
-  /**
-   * Remove all blocks belonging to the given RDD.
-   * @return The number of blocks removed.
-   */
-  def removeRdd(rddId: Int): Int = {
-    // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps
-    // from RDD.id to blocks.
-    logInfo("Removing RDD " + rddId)
-    val rddPrefix = "rdd_" + rddId + "_"
-    val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1)
-    blocksToRemove.foreach(blockId => removeBlock(blockId, false))
-    blocksToRemove.size
-  }
-
-  /**
-   * Remove a block from both memory and disk.
-   */
-  def removeBlock(blockId: String, tellMaster: Boolean = true) {
-    logInfo("Removing block " + blockId)
-    val info = blockInfo.get(blockId).orNull
-    if (info != null) info.synchronized {
-      // Removals are idempotent in disk store and memory store. At worst, we get a warning.
-      val removedFromMemory = memoryStore.remove(blockId)
-      val removedFromDisk = diskStore.remove(blockId)
-      if (!removedFromMemory && !removedFromDisk) {
-        logWarning("Block " + blockId + " could not be removed as it was not found in either " +
-          "the disk or memory store")
-      }
-      blockInfo.remove(blockId)
-      if (tellMaster && info.tellMaster) {
-        reportBlockStatus(blockId, info)
-      }
-    } else {
-      // The block has already been removed; do nothing.
-      logWarning("Asked to remove block " + blockId + ", which does not exist")
-    }
-  }
-
-  def dropOldBlocks(cleanupTime: Long) {
-    logInfo("Dropping blocks older than " + cleanupTime)
-    val iterator = blockInfo.internalMap.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
-      if (time < cleanupTime) {
-        info.synchronized {
-          val level = info.level
-          if (level.useMemory) {
-            memoryStore.remove(id)
-          }
-          if (level.useDisk) {
-            diskStore.remove(id)
-          }
-          iterator.remove()
-          logInfo("Dropped block " + id)
-        }
-        reportBlockStatus(id, info)
-      }
-    }
-  }
-
-  def shouldCompress(blockId: String): Boolean = {
-    if (ShuffleBlockManager.isShuffle(blockId)) {
-      compressShuffle
-    } else if (blockId.startsWith("broadcast_")) {
-      compressBroadcast
-    } else if (blockId.startsWith("rdd_")) {
-      compressRdds
-    } else {
-      false    // Won't happen in a real cluster, but it can in tests
-    }
-  }
-
-  /**
-   * Wrap an output stream for compression if block compression is enabled for its block type
-   */
-  def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
-    if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
-  }
-
-  /**
-   * Wrap an input stream for compression if block compression is enabled for its block type
-   */
-  def wrapForCompression(blockId: String, s: InputStream): InputStream = {
-    if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
-  }
-
-  def dataSerialize(
-      blockId: String,
-      values: Iterator[Any],
-      serializer: Serializer = defaultSerializer): ByteBuffer = {
-    val byteStream = new FastByteArrayOutputStream(4096)
-    val ser = serializer.newInstance()
-    ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
-    byteStream.trim()
-    ByteBuffer.wrap(byteStream.array)
-  }
-
-  /**
-   * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
-   * the iterator is reached.
-   */
-  def dataDeserialize(
-      blockId: String,
-      bytes: ByteBuffer,
-      serializer: Serializer = defaultSerializer): Iterator[Any] = {
-    bytes.rewind()
-    val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
-    serializer.newInstance().deserializeStream(stream).asIterator
-  }
-
-  def stop() {
-    if (heartBeatTask != null) {
-      heartBeatTask.cancel()
-    }
-    connectionManager.stop()
-    actorSystem.stop(slaveActor)
-    blockInfo.clear()
-    memoryStore.clear()
-    diskStore.clear()
-    metadataCleaner.cancel()
-    logInfo("BlockManager stopped")
-  }
-}
-
-
-private[spark] object BlockManager extends Logging {
-
-  val ID_GENERATOR = new IdGenerator
-
-  def getMaxMemoryFromSystemProperties: Long = {
-    val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
-    (Runtime.getRuntime.maxMemory * memoryFraction).toLong
-  }
-
-  def getHeartBeatFrequencyFromSystemProperties: Long =
-    System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
-
-  def getDisableHeartBeatsForTesting: Boolean =
-    System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
-
-  /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
-   */
-  def dispose(buffer: ByteBuffer) {
-    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace("Unmapping " + buffer)
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
-      }
-    }
-  }
-
-  def blockIdsToBlockManagers(
-      blockIds: Array[String],
-      env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-  : Map[String, Seq[BlockManagerId]] =
-  {
-    // env == null and blockManagerMaster != null is used in tests
-    assert (env != null || blockManagerMaster != null)
-    val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
-      env.blockManager.getLocationBlockIds(blockIds)
-    } else {
-      blockManagerMaster.getLocations(blockIds)
-    }
-
-    val blockManagers = new HashMap[String, Seq[BlockManagerId]]
-    for (i <- 0 until blockIds.length) {
-      blockManagers(blockIds(i)) = blockLocations(i)
-    }
-    blockManagers.toMap
-  }
-
-  def blockIdsToExecutorIds(
-      blockIds: Array[String],
-      env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-    : Map[String, Seq[String]] =
-  {
-    blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
-  }
-
-  def blockIdsToHosts(
-      blockIds: Array[String],
-      env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-    : Map[String, Seq[String]] =
-  {
-    blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
deleted file mode 100644
index b36a617..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
-import java.util.concurrent.ConcurrentHashMap
-import spark.Utils
-
-/**
- * This class represent an unique identifier for a BlockManager.
- * The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the apply method in
- * the companion object. This allows de-duplication of ID objects.
- * Also, constructor parameters are private to ensure that parameters cannot
- * be modified from outside this class.
- */
-private[spark] class BlockManagerId private (
-    private var executorId_ : String,
-    private var host_ : String,
-    private var port_ : Int,
-    private var nettyPort_ : Int
-  ) extends Externalizable {
-
-  private def this() = this(null, null, 0, 0)  // For deserialization only
-
-  def executorId: String = executorId_
-
-  if (null != host_){
-    Utils.checkHost(host_, "Expected hostname")
-    assert (port_ > 0)
-  }
-
-  def hostPort: String = {
-    // DEBUG code
-    Utils.checkHost(host)
-    assert (port > 0)
-
-    host + ":" + port
-  }
-
-  def host: String = host_
-
-  def port: Int = port_
-
-  def nettyPort: Int = nettyPort_
-
-  override def writeExternal(out: ObjectOutput) {
-    out.writeUTF(executorId_)
-    out.writeUTF(host_)
-    out.writeInt(port_)
-    out.writeInt(nettyPort_)
-  }
-
-  override def readExternal(in: ObjectInput) {
-    executorId_ = in.readUTF()
-    host_ = in.readUTF()
-    port_ = in.readInt()
-    nettyPort_ = in.readInt()
-  }
-
-  @throws(classOf[IOException])
-  private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
-
-  override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
-
-  override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
-
-  override def equals(that: Any) = that match {
-    case id: BlockManagerId =>
-      executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
-    case _ =>
-      false
-  }
-}
-
-
-private[spark] object BlockManagerId {
-
-  /**
-   * Returns a [[spark.storage.BlockManagerId]] for the given configuraiton.
-   *
-   * @param execId ID of the executor.
-   * @param host Host name of the block manager.
-   * @param port Port of the block manager.
-   * @param nettyPort Optional port for the Netty-based shuffle sender.
-   * @return A new [[spark.storage.BlockManagerId]].
-   */
-  def apply(execId: String, host: String, port: Int, nettyPort: Int) =
-    getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
-
-  def apply(in: ObjectInput) = {
-    val obj = new BlockManagerId()
-    obj.readExternal(in)
-    getCachedBlockManagerId(obj)
-  }
-
-  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
-
-  def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
-    blockManagerIdCache.putIfAbsent(id, id)
-    blockManagerIdCache.get(id)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
deleted file mode 100644
index 76128e8..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import akka.actor.ActorRef
-import akka.dispatch.{Await, Future}
-import akka.pattern.ask
-import akka.util.Duration
-
-import spark.{Logging, SparkException}
-import spark.storage.BlockManagerMessages._
-
-
-private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
-
-  val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
-  val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
-
-  val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
-
-  val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
-  /** Remove a dead executor from the driver actor. This is only called on the driver side. */
-  def removeExecutor(execId: String) {
-    tell(RemoveExecutor(execId))
-    logInfo("Removed " + execId + " successfully in removeExecutor")
-  }
-
-  /**
-   * Send the driver actor a heart beat from the slave. Returns true if everything works out,
-   * false if the driver does not know about the given block manager, which means the block
-   * manager should re-register.
-   */
-  def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
-    askDriverWithReply[Boolean](HeartBeat(blockManagerId))
-  }
-
-  /** Register the BlockManager's id with the driver. */
-  def registerBlockManager(
-      blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
-    logInfo("Trying to register BlockManager")
-    tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
-    logInfo("Registered BlockManager")
-  }
-
-  def updateBlockInfo(
-      blockManagerId: BlockManagerId,
-      blockId: String,
-      storageLevel: StorageLevel,
-      memSize: Long,
-      diskSize: Long): Boolean = {
-    val res = askDriverWithReply[Boolean](
-      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
-    logInfo("Updated info of block " + blockId)
-    res
-  }
-
-  /** Get locations of the blockId from the driver */
-  def getLocations(blockId: String): Seq[BlockManagerId] = {
-    askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
-  }
-
-  /** Get locations of multiple blockIds from the driver */
-  def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
-    askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
-  }
-
-  /** Get ids of other nodes in the cluster from the driver */
-  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
-    val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
-    if (result.length != numPeers) {
-      throw new SparkException(
-        "Error getting peers, only got " + result.size + " instead of " + numPeers)
-    }
-    result
-  }
-
-  /**
-   * Remove a block from the slaves that have it. This can only be used to remove
-   * blocks that the driver knows about.
-   */
-  def removeBlock(blockId: String) {
-    askDriverWithReply(RemoveBlock(blockId))
-  }
-
-  /**
-   * Remove all blocks belonging to the given RDD.
-   */
-  def removeRdd(rddId: Int, blocking: Boolean) {
-    val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
-    future onFailure {
-      case e: Throwable => logError("Failed to remove RDD " + rddId, e)
-    }
-    if (blocking) {
-      Await.result(future, timeout)
-    }
-  }
-
-  /**
-   * Return the memory status for each block manager, in the form of a map from
-   * the block manager's id to two long values. The first value is the maximum
-   * amount of memory allocated for the block manager, while the second is the
-   * amount of remaining memory.
-   */
-  def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
-  }
-
-  def getStorageStatus: Array[StorageStatus] = {
-    askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
-  }
-
-  /** Stop the driver actor, called only on the Spark driver node */
-  def stop() {
-    if (driverActor != null) {
-      tell(StopBlockManagerMaster)
-      driverActor = null
-      logInfo("BlockManagerMaster stopped")
-    }
-  }
-
-  /** Send a one-way message to the master actor, to which we expect it to reply with true. */
-  private def tell(message: Any) {
-    if (!askDriverWithReply[Boolean](message)) {
-      throw new SparkException("BlockManagerMasterActor returned false, expected true.")
-    }
-  }
-
-  /**
-   * Send a message to the driver actor and get its result within a default timeout, or
-   * throw a SparkException if this fails.
-   */
-  private def askDriverWithReply[T](message: Any): T = {
-    // TODO: Consider removing multiple attempts
-    if (driverActor == null) {
-      throw new SparkException("Error sending message to BlockManager as driverActor is null " +
-        "[message = " + message + "]")
-    }
-    var attempts = 0
-    var lastException: Exception = null
-    while (attempts < AKKA_RETRY_ATTEMPTS) {
-      attempts += 1
-      try {
-        val future = driverActor.ask(message)(timeout)
-        val result = Await.result(future, timeout)
-        if (result == null) {
-          throw new SparkException("BlockManagerMaster returned null")
-        }
-        return result.asInstanceOf[T]
-      } catch {
-        case ie: InterruptedException => throw ie
-        case e: Exception =>
-          lastException = e
-          logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
-      }
-      Thread.sleep(AKKA_RETRY_INTERVAL_MS)
-    }
-
-    throw new SparkException(
-      "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
deleted file mode 100644
index b7a981d..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable
-import scala.collection.JavaConversions._
-
-import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.dispatch.Future
-import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
-
-import spark.{Logging, Utils, SparkException}
-import spark.storage.BlockManagerMessages._
-
-
-/**
- * BlockManagerMasterActor is an actor on the master node to track statuses of
- * all slaves' block managers.
- */
-private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
-  // Mapping from block manager id to the block manager's information.
-  private val blockManagerInfo =
-    new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
-
-  // Mapping from executor ID to block manager ID.
-  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
-
-  // Mapping from block id to the set of block managers that have the block.
-  private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
-
-  val akkaTimeout = Duration.create(
-    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
-  initLogging()
-
-  val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
-    "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
-  val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
-    "60000").toLong
-
-  var timeoutCheckingTask: Cancellable = null
-
-  override def preStart() {
-    if (!BlockManager.getDisableHeartBeatsForTesting) {
-      timeoutCheckingTask = context.system.scheduler.schedule(
-        0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
-    }
-    super.preStart()
-  }
-
-  def receive = {
-    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
-      register(blockManagerId, maxMemSize, slaveActor)
-      sender ! true
-
-    case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
-      // TODO: Ideally we want to handle all the message replies in receive instead of in the
-      // individual private methods.
-      updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
-    case GetLocations(blockId) =>
-      sender ! getLocations(blockId)
-
-    case GetLocationsMultipleBlockIds(blockIds) =>
-      sender ! getLocationsMultipleBlockIds(blockIds)
-
-    case GetPeers(blockManagerId, size) =>
-      sender ! getPeers(blockManagerId, size)
-
-    case GetMemoryStatus =>
-      sender ! memoryStatus
-
-    case GetStorageStatus =>
-      sender ! storageStatus
-
-    case RemoveRdd(rddId) =>
-      sender ! removeRdd(rddId)
-
-    case RemoveBlock(blockId) =>
-      removeBlockFromWorkers(blockId)
-      sender ! true
-
-    case RemoveExecutor(execId) =>
-      removeExecutor(execId)
-      sender ! true
-
-    case StopBlockManagerMaster =>
-      logInfo("Stopping BlockManagerMaster")
-      sender ! true
-      if (timeoutCheckingTask != null) {
-        timeoutCheckingTask.cancel()
-      }
-      context.stop(self)
-
-    case ExpireDeadHosts =>
-      expireDeadHosts()
-
-    case HeartBeat(blockManagerId) =>
-      sender ! heartBeat(blockManagerId)
-
-    case other =>
-      logWarning("Got unknown message: " + other)
-  }
-
-  private def removeRdd(rddId: Int): Future[Seq[Int]] = {
-    // First remove the metadata for the given RDD, and then asynchronously remove the blocks
-    // from the slaves.
-
-    val prefix = "rdd_" + rddId + "_"
-    // Find all blocks for the given RDD, remove the block from both blockLocations and
-    // the blockManagerInfo that is tracking the blocks.
-    val blocks = blockLocations.keySet().filter(_.startsWith(prefix))
-    blocks.foreach { blockId =>
-      val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
-      bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
-      blockLocations.remove(blockId)
-    }
-
-    // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
-    // The dispatcher is used as an implicit argument into the Future sequence construction.
-    import context.dispatcher
-    val removeMsg = RemoveRdd(rddId)
-    Future.sequence(blockManagerInfo.values.map { bm =>
-      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
-    }.toSeq)
-  }
-
-  private def removeBlockManager(blockManagerId: BlockManagerId) {
-    val info = blockManagerInfo(blockManagerId)
-
-    // Remove the block manager from blockManagerIdByExecutor.
-    blockManagerIdByExecutor -= blockManagerId.executorId
-
-    // Remove it from blockManagerInfo and remove all the blocks.
-    blockManagerInfo.remove(blockManagerId)
-    val iterator = info.blocks.keySet.iterator
-    while (iterator.hasNext) {
-      val blockId = iterator.next
-      val locations = blockLocations.get(blockId)
-      locations -= blockManagerId
-      if (locations.size == 0) {
-        blockLocations.remove(locations)
-      }
-    }
-  }
-
-  private def expireDeadHosts() {
-    logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
-    val now = System.currentTimeMillis()
-    val minSeenTime = now - slaveTimeout
-    val toRemove = new mutable.HashSet[BlockManagerId]
-    for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
-          (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
-        toRemove += info.blockManagerId
-      }
-    }
-    toRemove.foreach(removeBlockManager)
-  }
-
-  private def removeExecutor(execId: String) {
-    logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
-    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
-  }
-
-  private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
-    if (!blockManagerInfo.contains(blockManagerId)) {
-      blockManagerId.executorId == "<driver>" && !isLocal
-    } else {
-      blockManagerInfo(blockManagerId).updateLastSeenMs()
-      true
-    }
-  }
-
-  // Remove a block from the slaves that have it. This can only be used to remove
-  // blocks that the master knows about.
-  private def removeBlockFromWorkers(blockId: String) {
-    val locations = blockLocations.get(blockId)
-    if (locations != null) {
-      locations.foreach { blockManagerId: BlockManagerId =>
-        val blockManager = blockManagerInfo.get(blockManagerId)
-        if (blockManager.isDefined) {
-          // Remove the block from the slave's BlockManager.
-          // Doesn't actually wait for a confirmation and the message might get lost.
-          // If message loss becomes frequent, we should add retry logic here.
-          blockManager.get.slaveActor ! RemoveBlock(blockId)
-        }
-      }
-    }
-  }
-
-  // Return a map from the block manager id to max memory and remaining memory.
-  private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    blockManagerInfo.map { case(blockManagerId, info) =>
-      (blockManagerId, (info.maxMem, info.remainingMem))
-    }.toMap
-  }
-
-  private def storageStatus: Array[StorageStatus] = {
-    blockManagerInfo.map { case(blockManagerId, info) =>
-      import collection.JavaConverters._
-      StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
-    }.toArray
-  }
-
-  private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
-    if (id.executorId == "<driver>" && !isLocal) {
-      // Got a register message from the master node; don't register it
-    } else if (!blockManagerInfo.contains(id)) {
-      blockManagerIdByExecutor.get(id.executorId) match {
-        case Some(manager) =>
-          // A block manager of the same executor already exists.
-          // This should never happen. Let's just quit.
-          logError("Got two different block manager registrations on " + id.executorId)
-          System.exit(1)
-        case None =>
-          blockManagerIdByExecutor(id.executorId) = id
-      }
-      blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
-        id, System.currentTimeMillis(), maxMemSize, slaveActor)
-    }
-  }
-
-  private def updateBlockInfo(
-      blockManagerId: BlockManagerId,
-      blockId: String,
-      storageLevel: StorageLevel,
-      memSize: Long,
-      diskSize: Long) {
-
-    if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.executorId == "<driver>" && !isLocal) {
-        // We intentionally do not register the master (except in local mode),
-        // so we should not indicate failure.
-        sender ! true
-      } else {
-        sender ! false
-      }
-      return
-    }
-
-    if (blockId == null) {
-      blockManagerInfo(blockManagerId).updateLastSeenMs()
-      sender ! true
-      return
-    }
-
-    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
-    var locations: mutable.HashSet[BlockManagerId] = null
-    if (blockLocations.containsKey(blockId)) {
-      locations = blockLocations.get(blockId)
-    } else {
-      locations = new mutable.HashSet[BlockManagerId]
-      blockLocations.put(blockId, locations)
-    }
-
-    if (storageLevel.isValid) {
-      locations.add(blockManagerId)
-    } else {
-      locations.remove(blockManagerId)
-    }
-
-    // Remove the block from master tracking if it has been removed on all slaves.
-    if (locations.size == 0) {
-      blockLocations.remove(blockId)
-    }
-    sender ! true
-  }
-
-  private def getLocations(blockId: String): Seq[BlockManagerId] = {
-    if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
-  }
-
-  private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
-    blockIds.map(blockId => getLocations(blockId))
-  }
-
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
-    val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-    val selfIndex = peers.indexOf(blockManagerId)
-    if (selfIndex == -1) {
-      throw new SparkException("Self index for " + blockManagerId + " not found")
-    }
-
-    // Note that this logic will select the same node multiple times if there aren't enough peers
-    Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
-  }
-}
-
-
-private[spark]
-object BlockManagerMasterActor {
-
-  case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-
-  class BlockManagerInfo(
-      val blockManagerId: BlockManagerId,
-      timeMs: Long,
-      val maxMem: Long,
-      val slaveActor: ActorRef)
-    extends Logging {
-
-    private var _lastSeenMs: Long = timeMs
-    private var _remainingMem: Long = maxMem
-
-    // Mapping from block id to its status.
-    private val _blocks = new JHashMap[String, BlockStatus]
-
-    logInfo("Registering block manager %s with %s RAM".format(
-      blockManagerId.hostPort, Utils.bytesToString(maxMem)))
-
-    def updateLastSeenMs() {
-      _lastSeenMs = System.currentTimeMillis()
-    }
-
-    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long,
-                        diskSize: Long) {
-
-      updateLastSeenMs()
-
-      if (_blocks.containsKey(blockId)) {
-        // The block exists on the slave already.
-        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
-
-        if (originalLevel.useMemory) {
-          _remainingMem += memSize
-        }
-      }
-
-      if (storageLevel.isValid) {
-        // isValid means it is either stored in-memory or on-disk.
-        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
-        if (storageLevel.useMemory) {
-          _remainingMem -= memSize
-          logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
-            Utils.bytesToString(_remainingMem)))
-        }
-        if (storageLevel.useDisk) {
-          logInfo("Added %s on disk on %s (size: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
-        }
-      } else if (_blocks.containsKey(blockId)) {
-        // If isValid is not true, drop the block.
-        val blockStatus: BlockStatus = _blocks.get(blockId)
-        _blocks.remove(blockId)
-        if (blockStatus.storageLevel.useMemory) {
-          _remainingMem += blockStatus.memSize
-          logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
-            Utils.bytesToString(_remainingMem)))
-        }
-        if (blockStatus.storageLevel.useDisk) {
-          logInfo("Removed %s on %s on disk (size: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
-        }
-      }
-    }
-
-    def removeBlock(blockId: String) {
-      if (_blocks.containsKey(blockId)) {
-        _remainingMem += _blocks.get(blockId).memSize
-        _blocks.remove(blockId)
-      }
-    }
-
-    def remainingMem: Long = _remainingMem
-
-    def lastSeenMs: Long = _lastSeenMs
-
-    def blocks: JHashMap[String, BlockStatus] = _blocks
-
-    override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
-    def clear() {
-      _blocks.clear()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
deleted file mode 100644
index 9375a9c..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import akka.actor.ActorRef
-
-
-private[storage] object BlockManagerMessages {
-  //////////////////////////////////////////////////////////////////////////////////
-  // Messages from the master to slaves.
-  //////////////////////////////////////////////////////////////////////////////////
-  sealed trait ToBlockManagerSlave
-
-  // Remove a block from the slaves that have it. This can only be used to remove
-  // blocks that the master knows about.
-  case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
-
-  // Remove all blocks belonging to a specific RDD.
-  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-
-
-  //////////////////////////////////////////////////////////////////////////////////
-  // Messages from slaves to the master.
-  //////////////////////////////////////////////////////////////////////////////////
-  sealed trait ToBlockManagerMaster
-
-  case class RegisterBlockManager(
-      blockManagerId: BlockManagerId,
-      maxMemSize: Long,
-      sender: ActorRef)
-    extends ToBlockManagerMaster
-
-  case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-  class UpdateBlockInfo(
-      var blockManagerId: BlockManagerId,
-      var blockId: String,
-      var storageLevel: StorageLevel,
-      var memSize: Long,
-      var diskSize: Long)
-    extends ToBlockManagerMaster
-    with Externalizable {
-
-    def this() = this(null, null, null, 0, 0)  // For deserialization only
-
-    override def writeExternal(out: ObjectOutput) {
-      blockManagerId.writeExternal(out)
-      out.writeUTF(blockId)
-      storageLevel.writeExternal(out)
-      out.writeLong(memSize)
-      out.writeLong(diskSize)
-    }
-
-    override def readExternal(in: ObjectInput) {
-      blockManagerId = BlockManagerId(in)
-      blockId = in.readUTF()
-      storageLevel = StorageLevel(in)
-      memSize = in.readLong()
-      diskSize = in.readLong()
-    }
-  }
-
-  object UpdateBlockInfo {
-    def apply(blockManagerId: BlockManagerId,
-        blockId: String,
-        storageLevel: StorageLevel,
-        memSize: Long,
-        diskSize: Long): UpdateBlockInfo = {
-      new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
-    }
-
-    // For pattern-matching
-    def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
-      Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
-    }
-  }
-
-  case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-  case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-  case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
-
-  case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-  case object GetMemoryStatus extends ToBlockManagerMaster
-
-  case object ExpireDeadHosts extends ToBlockManagerMaster
-
-  case object GetStorageStatus extends ToBlockManagerMaster
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
deleted file mode 100644
index 6e5fb43..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import akka.actor.Actor
-
-import spark.storage.BlockManagerMessages._
-
-
-/**
- * An actor to take commands from the master to execute options. For example,
- * this is used to remove blocks from the slave's BlockManager.
- */
-class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
-  override def receive = {
-
-    case RemoveBlock(blockId) =>
-      blockManager.removeBlock(blockId)
-
-    case RemoveRdd(rddId) =>
-      val numBlocksRemoved = blockManager.removeRdd(rddId)
-      sender ! numBlocksRemoved
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
deleted file mode 100644
index 2aecd1e..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-package spark.storage
-
-import com.codahale.metrics.{Gauge,MetricRegistry}
-
-import spark.metrics.source.Source
-
-
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
-  val metricRegistry = new MetricRegistry()
-  val sourceName = "BlockManager"
-
-  metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
-      maxMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
-      remainingMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
-      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
-      (maxMem - remainingMem) / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val diskSpaceUsed = storageStatusList
-      	.flatMap(_.blocks.values.map(_.diskSize))
-      	.reduceOption(_ + _)
-      	.getOrElse(0L)
-
-      diskSpaceUsed / 1024 / 1024
-    }
-  })
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
deleted file mode 100644
index 39064bc..0000000
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-
-import spark.{Logging, Utils}
-import spark.network._
-
-/**
- * A network interface for BlockManager. Each slave should have one
- * BlockManagerWorker.
- *
- * TODO: Use event model.
- */
-private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
-  initLogging()
-
-  blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
-
-  def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
-    logDebug("Handling message " + msg)
-    msg match {
-      case bufferMessage: BufferMessage => {
-        try {
-          logDebug("Handling as a buffer message " + bufferMessage)
-          val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
-          logDebug("Parsed as a block message array")
-          val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
-          return Some(new BlockMessageArray(responseMessages).toBufferMessage)
-        } catch {
-          case e: Exception => logError("Exception handling buffer message", e)
-          return None
-        }
-      }
-      case otherMessage: Any => {
-        logError("Unknown type message received: " + otherMessage)
-        return None
-      }
-    }
-  }
-
-  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
-    blockMessage.getType match {
-      case BlockMessage.TYPE_PUT_BLOCK => {
-        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
-        logDebug("Received [" + pB + "]")
-        putBlock(pB.id, pB.data, pB.level)
-        return None
-      }
-      case BlockMessage.TYPE_GET_BLOCK => {
-        val gB = new GetBlock(blockMessage.getId)
-        logDebug("Received [" + gB + "]")
-        val buffer = getBlock(gB.id)
-        if (buffer == null) {
-          return None
-        }
-        return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
-      }
-      case _ => return None
-    }
-  }
-
-  private def putBlock(id: String, bytes: ByteBuffer, level: StorageLevel) {
-    val startTimeMs = System.currentTimeMillis()
-    logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: " + bytes)
-    blockManager.putBytes(id, bytes, level)
-    logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
-        + " with data size: " + bytes.limit)
-  }
-
-  private def getBlock(id: String): ByteBuffer = {
-    val startTimeMs = System.currentTimeMillis()
-    logDebug("GetBlock " + id + " started from " + startTimeMs)
-    val buffer = blockManager.getLocalBytes(id) match {
-      case Some(bytes) => bytes
-      case None => null
-    }
-    logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
-        + " and got buffer " + buffer)
-    return buffer
-  }
-}
-
-private[spark] object BlockManagerWorker extends Logging {
-  private var blockManagerWorker: BlockManagerWorker = null
-
-  initLogging()
-
-  def startBlockManagerWorker(manager: BlockManager) {
-    blockManagerWorker = new BlockManagerWorker(manager)
-  }
-
-  def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = {
-    val blockManager = blockManagerWorker.blockManager
-    val connectionManager = blockManager.connectionManager
-    val blockMessage = BlockMessage.fromPutBlock(msg)
-    val blockMessageArray = new BlockMessageArray(blockMessage)
-    val resultMessage = connectionManager.sendMessageReliablySync(
-        toConnManagerId, blockMessageArray.toBufferMessage)
-    return (resultMessage != None)
-  }
-
-  def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
-    val blockManager = blockManagerWorker.blockManager
-    val connectionManager = blockManager.connectionManager
-    val blockMessage = BlockMessage.fromGetBlock(msg)
-    val blockMessageArray = new BlockMessageArray(blockMessage)
-    val responseMessage = connectionManager.sendMessageReliablySync(
-        toConnManagerId, blockMessageArray.toBufferMessage)
-    responseMessage match {
-      case Some(message) => {
-        val bufferMessage = message.asInstanceOf[BufferMessage]
-        logDebug("Response message received " + bufferMessage)
-        BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
-            logDebug("Found " + blockMessage)
-            return blockMessage.getData
-          })
-      }
-      case None => logDebug("No response message received"); return null
-    }
-    return null
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
deleted file mode 100644
index bcce26b..0000000
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.StringBuilder
-import scala.collection.mutable.ArrayBuffer
-
-import spark.network._
-
-private[spark] case class GetBlock(id: String)
-private[spark] case class GotBlock(id: String, data: ByteBuffer)
-private[spark] case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel) 
-
-private[spark] class BlockMessage() {
-  // Un-initialized: typ = 0
-  // GetBlock: typ = 1
-  // GotBlock: typ = 2
-  // PutBlock: typ = 3
-  private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
-  private var id: String = null
-  private var data: ByteBuffer = null
-  private var level: StorageLevel = null
- 
-  def set(getBlock: GetBlock) {
-    typ = BlockMessage.TYPE_GET_BLOCK
-    id = getBlock.id
-  }
-
-  def set(gotBlock: GotBlock) {
-    typ = BlockMessage.TYPE_GOT_BLOCK
-    id = gotBlock.id
-    data = gotBlock.data
-  }
-
-  def set(putBlock: PutBlock) {
-    typ = BlockMessage.TYPE_PUT_BLOCK
-    id = putBlock.id
-    data = putBlock.data
-    level = putBlock.level
-  }
-
-  def set(buffer: ByteBuffer) {
-    val startTime = System.currentTimeMillis
-    /*
-    println()
-    println("BlockMessage: ")
-    while(buffer.remaining > 0) {
-      print(buffer.get())
-    }
-    buffer.rewind()
-    println()
-    println()
-    */
-    typ = buffer.getInt()
-    val idLength = buffer.getInt()
-    val idBuilder = new StringBuilder(idLength)
-    for (i <- 1 to idLength) {
-      idBuilder += buffer.getChar()
-    }
-    id = idBuilder.toString()
-    
-    if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-
-      val booleanInt = buffer.getInt()
-      val replication = buffer.getInt()
-      level = StorageLevel(booleanInt, replication)
-      
-      val dataLength = buffer.getInt()
-      data = ByteBuffer.allocate(dataLength)
-      if (dataLength != buffer.remaining) {
-        throw new Exception("Error parsing buffer")
-      }
-      data.put(buffer)
-      data.flip()
-    } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-
-      val dataLength = buffer.getInt()
-      data = ByteBuffer.allocate(dataLength)
-      if (dataLength != buffer.remaining) {
-        throw new Exception("Error parsing buffer")
-      }
-      data.put(buffer)
-      data.flip()
-    }
-
-    val finishTime = System.currentTimeMillis
-  }
-
-  def set(bufferMsg: BufferMessage) {
-    val buffer = bufferMsg.buffers.apply(0)
-    buffer.clear()
-    set(buffer)
-  }
-  
-  def getType: Int = {
-    return typ
-  }
-  
-  def getId: String = {
-    return id
-  }
-  
-  def getData: ByteBuffer = {
-    return data
-  }
-  
-  def getLevel: StorageLevel = {
-    return level
-  }
-  
-  def toBufferMessage: BufferMessage = {
-    val startTime = System.currentTimeMillis
-    val buffers = new ArrayBuffer[ByteBuffer]()
-    var buffer = ByteBuffer.allocate(4 + 4 + id.length() * 2)
-    buffer.putInt(typ).putInt(id.length())
-    id.foreach((x: Char) => buffer.putChar(x))
-    buffer.flip()
-    buffers += buffer
-
-    if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-      buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
-      buffer.flip()
-      buffers += buffer
-      
-      buffer = ByteBuffer.allocate(4).putInt(data.remaining)
-      buffer.flip()
-      buffers += buffer
-
-      buffers += data
-    } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-      buffer = ByteBuffer.allocate(4).putInt(data.remaining)
-      buffer.flip()
-      buffers += buffer
-
-      buffers += data
-    }
-    
-    /*
-    println()
-    println("BlockMessage: ")
-    buffers.foreach(b => {
-      while(b.remaining > 0) {
-        print(b.get())
-      }
-      b.rewind()
-    })
-    println()
-    println()
-    */
-    val finishTime = System.currentTimeMillis
-    return Message.createBufferMessage(buffers)
-  }
-
-  override def toString: String = {
-    "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level + 
-    ", data = " + (if (data != null) data.remaining.toString  else "null") + "]"
-  }
-}
-
-private[spark] object BlockMessage {
-  val TYPE_NON_INITIALIZED: Int = 0
-  val TYPE_GET_BLOCK: Int = 1
-  val TYPE_GOT_BLOCK: Int = 2
-  val TYPE_PUT_BLOCK: Int = 3
- 
-  def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(bufferMessage)
-    newBlockMessage
-  }
-
-  def fromByteBuffer(buffer: ByteBuffer): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(buffer)
-    newBlockMessage
-  }
-
-  def fromGetBlock(getBlock: GetBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(getBlock)
-    newBlockMessage
-  }
-
-  def fromGotBlock(gotBlock: GotBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(gotBlock)
-    newBlockMessage
-  }
-  
-  def fromPutBlock(putBlock: PutBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(putBlock)
-    newBlockMessage
-  }
-
-  def main(args: Array[String]) {
-    val B = new BlockMessage()
-    B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2))
-    val bMsg = B.toBufferMessage
-    val C = new BlockMessage()
-    C.set(bMsg)
-    
-    println(B.getId + " " + B.getLevel)
-    println(C.getId + " " + C.getLevel)
-  }
-}