You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:13 UTC

[4/6] [SPARK-1132] Persisting Web UI through refactoring the SparkListener interface

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 28b019d..06b041e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -45,8 +45,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 private[spark] class CoarseMesosSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext,
-    master: String,
-    appName: String)
+    master: String)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with MScheduler
   with Logging {
@@ -94,7 +93,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         setDaemon(true)
         override def run() {
           val scheduler = CoarseMesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
           driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
           try { {
             val ret = driver.run()

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index bcf0ce1..4092dd0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -41,8 +41,7 @@ import org.apache.spark.util.Utils
 private[spark] class MesosSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext,
-    master: String,
-    appName: String)
+    master: String)
   extends SchedulerBackend
   with MScheduler
   with Logging {
@@ -71,7 +70,7 @@ private[spark] class MesosSchedulerBackend(
         setDaemon(true)
         override def run() {
           val scheduler = MesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
           driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
           try {
             val ret = driver.run()

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1bf3f4d..71584b6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
 import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
 import sun.nio.ch.DirectBuffer
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException, SecurityManager}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
@@ -92,7 +92,7 @@ private[spark] class BlockManager(
   val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
     name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
 
-  // Pending reregistration action being executed asynchronously or null if none
+  // Pending re-registration action being executed asynchronously or null if none
   // is pending. Accesses should synchronize on asyncReregisterLock.
   var asyncReregisterTask: Future[Unit] = null
   val asyncReregisterLock = new Object
@@ -122,10 +122,15 @@ private[spark] class BlockManager(
   /**
    * Construct a BlockManager with a memory limit set based on system properties.
    */
-  def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
-           serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = {
-    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf, 
-      securityManager)
+  def this(
+      execId: String,
+      actorSystem: ActorSystem,
+      master: BlockManagerMaster,
+      serializer: Serializer,
+      conf: SparkConf,
+      securityManager: SecurityManager) = {
+    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
+      conf, securityManager)
   }
 
   /**
@@ -148,14 +153,15 @@ private[spark] class BlockManager(
    * an executor crash.
    *
    * This function deliberately fails silently if the master returns false (indicating that
-   * the slave needs to reregister). The error condition will be detected again by the next
-   * heart beat attempt or new block registration and another try to reregister all blocks
+   * the slave needs to re-register). The error condition will be detected again by the next
+   * heart beat attempt or new block registration and another try to re-register all blocks
    * will be made then.
    */
   private def reportAllBlocks() {
     logInfo("Reporting " + blockInfo.size + " blocks to the master.")
     for ((blockId, info) <- blockInfo) {
-      if (!tryToReportBlockStatus(blockId, info)) {
+      val status = getCurrentBlockStatus(blockId, info)
+      if (!tryToReportBlockStatus(blockId, info, status)) {
         logError("Failed to report " + blockId + " to master; giving up.")
         return
       }
@@ -163,20 +169,20 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Reregister with the master and report all blocks to it. This will be called by the heart beat
+   * Re-register with the master and report all blocks to it. This will be called by the heart beat
    * thread if our heartbeat to the block manager indicates that we were not registered.
    *
    * Note that this method must be called without any BlockInfo locks held.
    */
   def reregister() {
-    // TODO: We might need to rate limit reregistering.
-    logInfo("BlockManager reregistering with master")
+    // TODO: We might need to rate limit re-registering.
+    logInfo("BlockManager re-registering with master")
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     reportAllBlocks()
   }
 
   /**
-   * Reregister with the master sometime soon.
+   * Re-register with the master sometime soon.
    */
   def asyncReregister() {
     asyncReregisterLock.synchronized {
@@ -192,7 +198,7 @@ private[spark] class BlockManager(
   }
 
   /**
-   * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
+   * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
    */
   def waitForAsyncReregister() {
     val task = asyncReregisterTask
@@ -211,15 +217,19 @@ private[spark] class BlockManager(
    * message reflecting the current status, *not* the desired storage level in its block info.
    * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
    *
-   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it
-   * is still valid). This ensures that update in master will compensate for the increase in
+   * droppedMemorySize exists to account for when the block is dropped from memory to disk (so
+   * it is still valid). This ensures that update in master will compensate for the increase in
    * memory on slave.
    */
-  def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) {
-    val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
+  def reportBlockStatus(
+      blockId: BlockId,
+      info: BlockInfo,
+      status: BlockStatus,
+      droppedMemorySize: Long = 0L) {
+    val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
     if (needReregister) {
-      logInfo("Got told to reregister updating block " + blockId)
-      // Reregistering will report our new block for free.
+      logInfo("Got told to re-register updating block " + blockId)
+      // Re-registering will report our new block for free.
       asyncReregister()
     }
     logDebug("Told master about block " + blockId)
@@ -230,27 +240,41 @@ private[spark] class BlockManager(
    * which will be true if the block was successfully recorded and false if
    * the slave needs to re-register.
    */
-  private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo,
+  private def tryToReportBlockStatus(
+      blockId: BlockId,
+      info: BlockInfo,
+      status: BlockStatus,
       droppedMemorySize: Long = 0L): Boolean = {
-    val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+    if (info.tellMaster) {
+      val storageLevel = status.storageLevel
+      val inMemSize = Math.max(status.memSize, droppedMemorySize)
+      val onDiskSize = status.diskSize
+      master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
+    } else true
+  }
+
+  /**
+   * Return the updated storage status of the block with the given ID. More specifically, if
+   * the block is dropped from memory and possibly added to disk, return the new storage level
+   * and the updated in-memory and on-disk sizes.
+   */
+  private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
+    val (newLevel, inMemSize, onDiskSize) = info.synchronized {
       info.level match {
         case null =>
-          (StorageLevel.NONE, 0L, 0L, false)
+          (StorageLevel.NONE, 0L, 0L)
         case level =>
           val inMem = level.useMemory && memoryStore.contains(blockId)
           val onDisk = level.useDisk && diskStore.contains(blockId)
-          val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
-          val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
+          val deserialized = if (inMem) level.deserialized else false
+          val replication = if (inMem || onDisk) level.replication else 1
+          val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
+          val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
-          (storageLevel, memSize, diskSize, info.tellMaster)
+          (storageLevel, memSize, diskSize)
       }
     }
-
-    if (tellMaster) {
-      master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
-    } else {
-      true
-    }
+    BlockStatus(newLevel, inMemSize, onDiskSize)
   }
 
   /**
@@ -398,10 +422,10 @@ private[spark] class BlockManager(
   /**
    * Get block from remote block managers as serialized bytes.
    */
-   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
     logDebug("Getting remote block " + blockId + " as bytes")
     doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
-   }
+  }
 
   private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
     require(blockId != null, "BlockId is null")
@@ -447,9 +471,8 @@ private[spark] class BlockManager(
    * so that we can control the maxMegabytesInFlight for the fetch.
    */
   def getMultiple(
-    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer)
-      : BlockFetcherIterator = {
-
+      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
+      serializer: Serializer): BlockFetcherIterator = {
     val iter =
       if (conf.getBoolean("spark.shuffle.use.netty", false)) {
         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -461,8 +484,11 @@ private[spark] class BlockManager(
     iter
   }
 
-  def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
-    : Long = {
+  def put(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
     doPut(blockId, IteratorValues(values), level, tellMaster)
   }
 
@@ -472,41 +498,58 @@ private[spark] class BlockManager(
    * This is currently used for writing shuffle files out. Callers should handle error
    * cases.
    */
-  def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
-    : BlockObjectWriter = {
+  def getDiskWriter(
+      blockId: BlockId,
+      file: File,
+      serializer: Serializer,
+      bufferSize: Int): BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
     val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
     new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
   }
 
   /**
-   * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
+   * Put a new block of values to the block manager. Return a list of blocks updated as a
+   * result of this put.
    */
-  def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
-          tellMaster: Boolean = true) : Long = {
+  def put(
+      blockId: BlockId,
+      values: ArrayBuffer[Any],
+      level: StorageLevel,
+      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
     require(values != null, "Values is null")
     doPut(blockId, ArrayBufferValues(values), level, tellMaster)
   }
 
   /**
-   * Put a new block of serialized bytes to the block manager.
+   * Put a new block of serialized bytes to the block manager. Return a list of blocks updated
+   * as a result of this put.
    */
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
-               tellMaster: Boolean = true) {
+  def putBytes(
+      blockId: BlockId,
+      bytes: ByteBuffer,
+      level: StorageLevel,
+      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
     require(bytes != null, "Bytes is null")
     doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
   }
 
-  private def doPut(blockId: BlockId,
-                    data: Values,
-                    level: StorageLevel, tellMaster: Boolean = true): Long = {
+  private def doPut(
+      blockId: BlockId,
+      data: Values,
+      level: StorageLevel,
+      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
 
+    // Return value
+    val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
     // Remember the block's storage level so that we can correctly drop it to disk if it needs
     // to be dropped right after it got put into memory. Note, however, that other threads will
     // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = {
+    val putBlockInfo = {
       val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
@@ -514,7 +557,7 @@ private[spark] class BlockManager(
       if (oldBlockOpt.isDefined) {
         if (oldBlockOpt.get.waitForReady()) {
           logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-          return oldBlockOpt.get.size
+          return updatedBlocks
         }
 
         // TODO: So the block info exists - but previous attempt to load it (?) failed.
@@ -536,7 +579,7 @@ private[spark] class BlockManager(
     // Ditto for the bytes after the put
     var bytesAfterPut: ByteBuffer = null
 
-    // Size of the block in bytes (to return to caller)
+    // Size of the block in bytes
     var size = 0L
 
     // If we're storing bytes, then initiate the replication before storing them locally.
@@ -551,7 +594,7 @@ private[spark] class BlockManager(
       null
     }
 
-    myInfo.synchronized {
+    putBlockInfo.synchronized {
       logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
@@ -566,7 +609,7 @@ private[spark] class BlockManager(
             case ArrayBufferValues(array) =>
               memoryStore.putValues(blockId, array, level, true)
             case ByteBufferValues(bytes) => {
-              bytes.rewind();
+              bytes.rewind()
               memoryStore.putBytes(blockId, bytes, level)
             }
           }
@@ -575,6 +618,8 @@ private[spark] class BlockManager(
             case Right(newBytes) => bytesAfterPut = newBytes
             case Left(newIterator) => valuesAfterPut = newIterator
           }
+          // Keep track of which blocks are dropped from memory
+          res.droppedBlocks.foreach { block => updatedBlocks += block }
         } else {
           // Save directly to disk.
           // Don't get back the bytes unless we replicate them.
@@ -586,7 +631,7 @@ private[spark] class BlockManager(
             case ArrayBufferValues(array) =>
               diskStore.putValues(blockId, array, level, askForBytes)
             case ByteBufferValues(bytes) => {
-              bytes.rewind();
+              bytes.rewind()
               diskStore.putBytes(blockId, bytes, level)
             }
           }
@@ -597,21 +642,25 @@ private[spark] class BlockManager(
           }
         }
 
-        // Now that the block is in either the memory or disk store, let other threads read it,
-        // and tell the master about it.
-        marked = true
-        myInfo.markReady(size)
-        if (tellMaster) {
-          reportBlockStatus(blockId, myInfo)
+        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
+          // Now that the block is in either the memory or disk store, let other threads read it,
+          // and tell the master about it.
+          marked = true
+          putBlockInfo.markReady(size)
+          if (tellMaster) {
+            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+          }
+          updatedBlocks += ((blockId, putBlockStatus))
         }
       } finally {
-        // If we failed at putting the block to memory/disk, notify other possible readers
+        // If we failed in putting the block to memory/disk, notify other possible readers
         // that it has failed, and then remove it from the block info map.
-        if (! marked) {
+        if (!marked) {
           // Note that the remove must happen before markFailure otherwise another thread
           // could've inserted a new BlockInfo before we remove it.
           blockInfo.remove(blockId)
-          myInfo.markFailure()
+          putBlockInfo.markFailure()
           logWarning("Putting block " + blockId + " failed")
         }
       }
@@ -650,7 +699,7 @@ private[spark] class BlockManager(
         Utils.getUsedTimeMs(startTimeMs))
     }
 
-    size
+    updatedBlocks
   }
 
   /**
@@ -687,28 +736,42 @@ private[spark] class BlockManager(
   /**
    * Write a block consisting of a single object.
    */
-  def putSingle(blockId: BlockId, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
+  def putSingle(
+      blockId: BlockId,
+      value: Any,
+      level: StorageLevel,
+      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
     put(blockId, Iterator(value), level, tellMaster)
   }
 
   /**
    * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
    * store reaches its limit and needs to free up space.
+   *
+   * Return the block status if the given block has been updated, else None.
    */
-  def dropFromMemory(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer]) {
+  def dropFromMemory(
+      blockId: BlockId,
+      data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+
     logInfo("Dropping block " + blockId + " from memory")
     val info = blockInfo.get(blockId).orNull
+
+    // If the block has not already been dropped
     if (info != null)  {
       info.synchronized {
         // required ? As of now, this will be invoked only for blocks which are ready
         // But in case this changes in future, adding for consistency sake.
-        if (! info.waitForReady() ) {
+        if (!info.waitForReady()) {
           // If we get here, the block write failed.
           logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
-          return
+          return None
         }
 
+        var blockIsUpdated = false
         val level = info.level
+
+        // Drop to disk, if storage level requires
         if (level.useDisk && !diskStore.contains(blockId)) {
           logInfo("Writing block " + blockId + " to disk")
           data match {
@@ -717,24 +780,33 @@ private[spark] class BlockManager(
             case Right(bytes) =>
               diskStore.putBytes(blockId, bytes, level)
           }
+          blockIsUpdated = true
         }
+
+        // Actually drop from memory store
         val droppedMemorySize =
           if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
-        val blockWasRemoved = memoryStore.remove(blockId)
-        if (!blockWasRemoved) {
+        val blockIsRemoved = memoryStore.remove(blockId)
+        if (blockIsRemoved) {
+          blockIsUpdated = true
+        } else {
           logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
         }
+
+        val status = getCurrentBlockStatus(blockId, info)
         if (info.tellMaster) {
-          reportBlockStatus(blockId, info, droppedMemorySize)
+          reportBlockStatus(blockId, info, status, droppedMemorySize)
         }
         if (!level.useDisk) {
           // The block is completely gone from this node; forget it so we can put() it again later.
           blockInfo.remove(blockId)
         }
+        if (blockIsUpdated) {
+          return Some(status)
+        }
       }
-    } else {
-      // The block has already been dropped
     }
+    None
   }
 
   /**
@@ -766,7 +838,8 @@ private[spark] class BlockManager(
       }
       blockInfo.remove(blockId)
       if (tellMaster && info.tellMaster) {
-        reportBlockStatus(blockId, info)
+        val status = getCurrentBlockStatus(blockId, info)
+        reportBlockStatus(blockId, info, status)
       }
     } else {
       // The block has already been removed; do nothing.
@@ -801,7 +874,8 @@ private[spark] class BlockManager(
           iterator.remove()
           logInfo("Dropped block " + id)
         }
-        reportBlockStatus(id, info)
+        val status = getCurrentBlockStatus(id, info)
+        reportBlockStatus(id, info, status)
       }
     }
   }
@@ -911,9 +985,8 @@ private[spark] object BlockManager extends Logging {
   def blockIdsToBlockManagers(
       blockIds: Array[BlockId],
       env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-  : Map[BlockId, Seq[BlockManagerId]] =
-  {
+      blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = {
+
     // blockManagerMaster != null is used in tests
     assert (env != null || blockManagerMaster != null)
     val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
@@ -932,18 +1005,14 @@ private[spark] object BlockManager extends Logging {
   def blockIdsToExecutorIds(
       blockIds: Array[BlockId],
       env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-    : Map[BlockId, Seq[String]] =
-  {
+      blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
     blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
   }
 
   def blockIdsToHosts(
       blockIds: Array[BlockId],
       env: SparkEnv,
-      blockManagerMaster: BlockManagerMaster = null)
-    : Map[BlockId, Seq[String]] =
-  {
+      blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
     blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 98cd6e6..be537d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -50,7 +50,6 @@ private[spark] class BlockManagerId private (
     // DEBUG code
     Utils.checkHost(host)
     assert (port > 0)
-
     host + ":" + port
   }
 
@@ -93,7 +92,7 @@ private[spark] class BlockManagerId private (
 private[spark] object BlockManagerId {
 
   /**
-   * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.
+   * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuration.
    *
    * @param execId ID of the executor.
    * @param host Host name of the block manager.

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e531467..ed69378 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.AkkaUtils
 
 private[spark]
-class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
-
+class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
   val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
   val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
 
@@ -53,8 +52,7 @@ class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Lo
   }
 
   /** Register the BlockManager's id with the driver. */
-  def registerBlockManager(
-      blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+  def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
     logInfo("Trying to register BlockManager")
     tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
     logInfo("Registered BlockManager")

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index a999d76..ff2652b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
 import akka.pattern.ask
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -36,11 +37,11 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * all slaves' block managers.
  */
 private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus)
+  extends Actor with Logging {
 
   // Mapping from block manager id to the block manager's information.
-  private val blockManagerInfo =
-    new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
 
   // Mapping from executor ID to block manager ID.
   private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
@@ -160,6 +161,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
         blockLocations.remove(locations)
       }
     }
+    listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
   }
 
   private def expireDeadHosts() {
@@ -217,8 +219,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
 
   private def storageStatus: Array[StorageStatus] = {
     blockManagerInfo.map { case(blockManagerId, info) =>
-      import collection.JavaConverters._
-      StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
+      val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
+      new StorageStatus(blockManagerId, info.maxMem, blockMap)
     }.toArray
   }
 
@@ -233,9 +235,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
         case None =>
           blockManagerIdByExecutor(id.executorId) = id
       }
-      blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
-        id, System.currentTimeMillis(), maxMemSize, slaveActor)
+      blockManagerInfo(id) =
+        new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }
+    listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
   }
 
   private def updateBlockInfo(
@@ -307,97 +310,96 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
 }
 
 
-private[spark]
-object BlockManagerMasterActor {
-
-  case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
 
-  class BlockManagerInfo(
-      val blockManagerId: BlockManagerId,
-      timeMs: Long,
-      val maxMem: Long,
-      val slaveActor: ActorRef)
-    extends Logging {
+private[spark] class BlockManagerInfo(
+    val blockManagerId: BlockManagerId,
+    timeMs: Long,
+    val maxMem: Long,
+    val slaveActor: ActorRef)
+  extends Logging {
 
-    private var _lastSeenMs: Long = timeMs
-    private var _remainingMem: Long = maxMem
+  private var _lastSeenMs: Long = timeMs
+  private var _remainingMem: Long = maxMem
 
-    // Mapping from block id to its status.
-    private val _blocks = new JHashMap[BlockId, BlockStatus]
+  // Mapping from block id to its status.
+  private val _blocks = new JHashMap[BlockId, BlockStatus]
 
-    logInfo("Registering block manager %s with %s RAM".format(
-      blockManagerId.hostPort, Utils.bytesToString(maxMem)))
+  logInfo("Registering block manager %s with %s RAM".format(
+    blockManagerId.hostPort, Utils.bytesToString(maxMem)))
 
-    def updateLastSeenMs() {
-      _lastSeenMs = System.currentTimeMillis()
-    }
+  def updateLastSeenMs() {
+    _lastSeenMs = System.currentTimeMillis()
+  }
 
-    def updateBlockInfo(blockId: BlockId, storageLevel: StorageLevel, memSize: Long,
-                        diskSize: Long) {
+  def updateBlockInfo(
+      blockId: BlockId,
+      storageLevel: StorageLevel,
+      memSize: Long,
+      diskSize: Long) {
 
-      updateLastSeenMs()
+    updateLastSeenMs()
 
-      if (_blocks.containsKey(blockId)) {
-        // The block exists on the slave already.
-        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+    if (_blocks.containsKey(blockId)) {
+      // The block exists on the slave already.
+      val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
 
-        if (originalLevel.useMemory) {
-          _remainingMem += memSize
-        }
+      if (originalLevel.useMemory) {
+        _remainingMem += memSize
       }
+    }
 
-      if (storageLevel.isValid) {
-        // isValid means it is either stored in-memory or on-disk.
-        // But the memSize here indicates the data size in or dropped from memory,
-        // and the diskSize here indicates the data size in or dropped to disk.
-        // They can be both larger than 0, when a block is dropped from memory to disk.
-        // Therefore, a safe way to set BlockStatus is to set its info in accurate modes.
-        if (storageLevel.useMemory) {
-          _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
-          _remainingMem -= memSize
-          logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
-            Utils.bytesToString(_remainingMem)))
-        }
-        if (storageLevel.useDisk) {
-          _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
-          logInfo("Added %s on disk on %s (size: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
-        }
-      } else if (_blocks.containsKey(blockId)) {
-        // If isValid is not true, drop the block.
-        val blockStatus: BlockStatus = _blocks.get(blockId)
-        _blocks.remove(blockId)
-        if (blockStatus.storageLevel.useMemory) {
-          _remainingMem += blockStatus.memSize
-          logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
-            Utils.bytesToString(_remainingMem)))
-        }
-        if (blockStatus.storageLevel.useDisk) {
-          logInfo("Removed %s on %s on disk (size: %s)".format(
-            blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
-        }
+    if (storageLevel.isValid) {
+      /* isValid means it is either stored in-memory or on-disk.
+       * But the memSize here indicates the data size in or dropped from memory,
+       * and the diskSize here indicates the data size in or dropped to disk.
+       * They can be both larger than 0, when a block is dropped from memory to disk.
+       * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
+      if (storageLevel.useMemory) {
+        _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
+        _remainingMem -= memSize
+        logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
+          blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+          Utils.bytesToString(_remainingMem)))
+      }
+      if (storageLevel.useDisk) {
+        _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
+        logInfo("Added %s on disk on %s (size: %s)".format(
+          blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+      }
+    } else if (_blocks.containsKey(blockId)) {
+      // If isValid is not true, drop the block.
+      val blockStatus: BlockStatus = _blocks.get(blockId)
+      _blocks.remove(blockId)
+      if (blockStatus.storageLevel.useMemory) {
+        _remainingMem += blockStatus.memSize
+        logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
+          blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
+          Utils.bytesToString(_remainingMem)))
+      }
+      if (blockStatus.storageLevel.useDisk) {
+        logInfo("Removed %s on %s on disk (size: %s)".format(
+          blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
       }
     }
+  }
 
-    def removeBlock(blockId: BlockId) {
-      if (_blocks.containsKey(blockId)) {
-        _remainingMem += _blocks.get(blockId).memSize
-        _blocks.remove(blockId)
-      }
+  def removeBlock(blockId: BlockId) {
+    if (_blocks.containsKey(blockId)) {
+      _remainingMem += _blocks.get(blockId).memSize
+      _blocks.remove(blockId)
     }
+  }
 
-    def remainingMem: Long = _remainingMem
+  def remainingMem: Long = _remainingMem
 
-    def lastSeenMs: Long = _lastSeenMs
+  def lastSeenMs: Long = _lastSeenMs
 
-    def blocks: JHashMap[BlockId, BlockStatus] = _blocks
+  def blocks: JHashMap[BlockId, BlockStatus] = _blocks
 
-    override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+  override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
 
-    def clear() {
-      _blocks.clear()
-    }
+  def clear() {
+    _blocks.clear()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 38836d4..488f1ea 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
     // Work on a duplicate - since the original input might be used elsewhere.
     val bytes = _bytes.duplicate()
     bytes.rewind()
@@ -70,16 +70,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       blockId: BlockId,
       values: ArrayBuffer[Any],
       level: StorageLevel,
-      returnValues: Boolean)
-  : PutResult = {
+      returnValues: Boolean): PutResult = {
     if (level.deserialized) {
       val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, values, sizeEstimate, true)
-      PutResult(sizeEstimate, Left(values.toIterator))
+      val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
+      PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
     } else {
-      val bytes = blockManager.dataSerialize(blockId, values.toIterator)
-      tryToPut(blockId, bytes, bytes.limit, false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
+      val bytes = blockManager.dataSerialize(blockId, values.iterator)
+      val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
     }
   }
 
@@ -87,20 +86,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      returnValues: Boolean)
-    : PutResult = {
-
-    if (level.deserialized) {
-      val valueEntries = new ArrayBuffer[Any]()
-      valueEntries ++= values
-      val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
-      tryToPut(blockId, valueEntries, sizeEstimate, true)
-      PutResult(sizeEstimate, Left(valueEntries.toIterator))
-    } else {
-      val bytes = blockManager.dataSerialize(blockId, values)
-      tryToPut(blockId, bytes, bytes.limit, false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
-    }
+      returnValues: Boolean): PutResult = {
+    val valueEntries = new ArrayBuffer[Any]()
+    valueEntries ++= values
+    putValues(blockId, valueEntries, level, returnValues)
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -164,19 +153,34 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
    * size must also be passed by the caller.
    *
-   * Locks on the object putLock to ensure that all the put requests and its associated block
+   * Lock on the object putLock to ensure that all the put requests and its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread is dropping
    * blocks to free memory for one block, another thread may use up the freed space for
    * another block.
+   *
+   * Return whether put was successful, along with the blocks dropped in the process.
    */
-  private def tryToPut(blockId: BlockId, value: Any, size: Long, deserialized: Boolean): Boolean = {
-    // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
-    // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
-    // released, it must be ensured that those to-be-dropped blocks are not double counted for
-    // freeing up more space for another block that needs to be put. Only then the actually dropping
-    // of blocks (and writing to disk if necessary) can proceed in parallel.
+  private def tryToPut(
+      blockId: BlockId,
+      value: Any,
+      size: Long,
+      deserialized: Boolean): ResultWithDroppedBlocks = {
+
+    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
+     * been released, it must be ensured that those to-be-dropped blocks are not double counted
+     * for freeing up more space for another block that needs to be put. Only then the actually
+     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
+
+    var putSuccess = false
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
     putLock.synchronized {
-      if (ensureFreeSpace(blockId, size)) {
+      val freeSpaceResult = ensureFreeSpace(blockId, size)
+      val enoughFreeSpace = freeSpaceResult.success
+      droppedBlocks ++= freeSpaceResult.droppedBlocks
+
+      if (enoughFreeSpace) {
         val entry = new Entry(value, size, deserialized)
         entries.synchronized {
           entries.put(blockId, entry)
@@ -189,7 +193,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
           logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
             blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
         }
-        true
+        putSuccess = true
       } else {
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
@@ -198,29 +202,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         } else {
           Right(value.asInstanceOf[ByteBuffer].duplicate())
         }
-        blockManager.dropFromMemory(blockId, data)
-        false
+        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
       }
     }
+    ResultWithDroppedBlocks(putSuccess, droppedBlocks)
   }
 
   /**
-   * Tries to free up a given amount of space to store a particular block, but can fail and return
-   * false if either the block is bigger than our memory or it would require replacing another
-   * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
+   * Try to free up a given amount of space to store a particular block, but can fail if
+   * either the block is bigger than our memory or it would require replacing another block
+   * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
    * don't fit into memory that we want to avoid).
    *
-   * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+   * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
    * Otherwise, the freed space may fill up before the caller puts in their new value.
+   *
+   * Return whether there is enough free space, along with the blocks dropped in the process.
    */
-  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): Boolean = {
-
+  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
     logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
       space, currentMemory, maxMemory))
 
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
     if (space > maxMemory) {
       logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
-      return false
+      return ResultWithDroppedBlocks(success = false, droppedBlocks)
     }
 
     if (maxMemory - currentMemory < space) {
@@ -256,17 +264,18 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
-            blockManager.dropFromMemory(blockId, data)
+            val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+            droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
           }
         }
-        return true
+        return ResultWithDroppedBlocks(success = true, droppedBlocks)
       } else {
         logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
           "from the same RDD")
-        return false
+        return ResultWithDroppedBlocks(success = false, droppedBlocks)
       }
     }
-    true
+    ResultWithDroppedBlocks(success = true, droppedBlocks)
   }
 
   override def contains(blockId: BlockId): Boolean = {
@@ -274,3 +283,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 }
 
+private case class ResultWithDroppedBlocks(
+    success: Boolean,
+    droppedBlocks: Seq[(BlockId, BlockStatus)])

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/PutResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
index 2eba2f0..f0eac75 100644
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
@@ -20,7 +20,13 @@ package org.apache.spark.storage
 import java.nio.ByteBuffer
 
 /**
- * Result of adding a block into a BlockStore. Contains its estimated size, and possibly the
- * values put if the caller asked for them to be returned (e.g. for chaining replication)
+ * Result of adding a block into a BlockStore. This case class contains a few things:
+ *   (1) The estimated size of the put,
+ *   (2) The values put if the caller asked for them to be returned (e.g. for chaining
+ *       replication), and
+ *   (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore.
  */
-private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])
+private[spark] case class PutResult(
+    size: Long,
+    data: Either[Iterator[_], ByteBuffer],
+    droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
new file mode 100644
index 0000000..26565f5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+/**
+ * A SparkListener that maintains executor storage status
+ */
+private[spark] class StorageStatusListener extends SparkListener {
+  private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+
+  def storageStatusList = executorIdToStorageStatus.values.toSeq
+
+  /** Update storage status list to reflect updated block statuses */
+  def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
+    val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
+    filteredStatus.foreach { storageStatus =>
+      updatedBlocks.foreach { case (blockId, updatedStatus) =>
+        storageStatus.blocks(blockId) = updatedStatus
+      }
+    }
+  }
+
+  /** Update storage status list to reflect the removal of an RDD from the cache */
+  def updateStorageStatus(unpersistedRDDId: Int) {
+    storageStatusList.foreach { storageStatus =>
+      val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
+      unpersistedBlocksIds.foreach { blockId =>
+        storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L)
+      }
+    }
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    val info = taskEnd.taskInfo
+    val metrics = taskEnd.taskMetrics
+    if (info != null && metrics != null) {
+      val execId = formatExecutorId(info.executorId)
+      val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+      if (updatedBlocks.length > 0) {
+        updateStorageStatus(execId, updatedBlocks)
+      }
+    }
+  }
+
+  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
+    updateStorageStatus(unpersistRDD.rddId)
+  }
+
+  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) {
+    synchronized {
+      val blockManagerId = blockManagerAdded.blockManagerId
+      val executorId = blockManagerId.executorId
+      val maxMem = blockManagerAdded.maxMem
+      val storageStatus = new StorageStatus(blockManagerId, maxMem)
+      executorIdToStorageStatus(executorId) = storageStatus
+    }
+  }
+
+  override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
+    synchronized {
+      val executorId = blockManagerRemoved.blockManagerId.executorId
+      executorIdToStorageStatus.remove(executorId)
+    }
+  }
+
+  /**
+   * In the local mode, there is a discrepancy between the executor ID according to the
+   * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
+   * results in duplicate rows for the same executor. Thus, in this mode, we aggregate
+   * these two rows and use the executor ID of "<driver>" to be consistent.
+   */
+  def formatExecutorId(execId: String): String = {
+    if (execId == "localhost") "<driver>" else execId
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2d88a40..6153dfe 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark.storage
 
+import scala.collection.Map
+import scala.collection.mutable
+
 import org.apache.spark.SparkContext
-import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
 import org.apache.spark.util.Utils
 
 private[spark]
-case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
-  blocks: Map[BlockId, BlockStatus]) {
+class StorageStatus(
+    val blockManagerId: BlockManagerId,
+    val maxMem: Long,
+    val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
 
   def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
 
@@ -43,14 +47,18 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
   }
 }
 
-case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
-  numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
+private[spark]
+class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel)
   extends Ordered[RDDInfo] {
+
+  var numCachedPartitions = 0
+  var memSize = 0L
+  var diskSize = 0L
+
   override def toString = {
-    import Utils.bytesToString
     ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
        "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
-         numPartitions, bytesToString(memSize), bytesToString(diskSize))
+         numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize))
   }
 
   override def compare(that: RDDInfo) = {
@@ -62,55 +70,76 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
 private[spark]
 object StorageUtils {
 
-  /* Returns RDD-level information, compiled from a list of StorageStatus objects */
-  def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
-    sc: SparkContext) : Array[RDDInfo] = {
-    rddInfoFromBlockStatusList(
-      storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
+  /**
+   * Returns basic information of all RDDs persisted in the given SparkContext. This does not
+   * include storage information.
+   */
+  def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
+    sc.persistentRdds.values.map { rdd =>
+      val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
+      val rddNumPartitions = rdd.partitions.size
+      val rddStorageLevel = rdd.getStorageLevel
+      val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel)
+      rddInfo
+    }.toArray
   }
 
-  /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
-  def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = {
-    val blockLocationPairs = storageStatusList
-      .flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort)))
-    blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap
+  /** Returns storage information of all RDDs persisted in the given SparkContext. */
+  def rddInfoFromStorageStatus(
+      storageStatuses: Seq[StorageStatus],
+      sc: SparkContext): Array[RDDInfo] = {
+    rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
   }
 
-  /* Given a list of BlockStatus objets, returns information for each RDD */
-  def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus],
-    sc: SparkContext) : Array[RDDInfo] = {
+  /** Returns storage information of all RDDs in the given list. */
+  def rddInfoFromStorageStatus(
+      storageStatuses: Seq[StorageStatus],
+      rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
+
+    // Mapping from RDD ID -> an array of associated BlockStatuses
+    val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
+      .groupBy { case (k, _) => k.rddId }
+      .mapValues(_.values.toArray)
 
-    // Group by rddId, ignore the partition name
-    val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId }.mapValues(_.values.toArray)
+    // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
+    val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
 
-    // For each RDD, generate an RDDInfo object
-    val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) =>
+    val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
       // Add up memory and disk sizes
-      val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
-      val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
-
-      // Get the friendly name and storage level for the RDD, if available
-      sc.persistentRdds.get(rddId).map { r =>
-        val rddName = Option(r.name).getOrElse(rddId.toString)
-        val rddStorageLevel = r.getStorageLevel
-        RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size,
-          memSize, diskSize)
+      val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 }
+      val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+      val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+      rddInfoMap.get(rddId).map { rddInfo =>
+        rddInfo.numCachedPartitions = persistedBlocks.length
+        rddInfo.memSize = memSize
+        rddInfo.diskSize = diskSize
+        rddInfo
       }
-    }.flatten.toArray
+    }.toArray
 
-    scala.util.Sorting.quickSort(rddInfos)
-
-    rddInfos
+    scala.util.Sorting.quickSort(rddStorageInfos)
+    rddStorageInfos
   }
 
-  /* Filters storage status by a given RDD id. */
-  def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int)
-    : Array[StorageStatus] = {
-
-    storageStatusList.map { status =>
-      val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus]
-      //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _)
-      StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
+  /** Returns a mapping from BlockId to the locations of the associated block. */
+  def blockLocationsFromStorageStatus(
+      storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
+    val blockLocationPairs = storageStatuses.flatMap { storageStatus =>
+      storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) }
     }
+    blockLocationPairs.toMap
+      .groupBy { case (blockId, _) => blockId }
+      .mapValues(_.values.toSeq)
+  }
+
+  /** Filters the given list of StorageStatus by the given RDD ID. */
+  def filterStorageStatusByRDD(
+      storageStatuses: Seq[StorageStatus],
+      rddId: Int): Array[StorageStatus] = {
+    storageStatuses.map { status =>
+      val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq
+      val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*)
+      new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap)
+    }.toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 36f2a0f..226ed2a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.ArrayBlockingQueue
 import akka.actor._
 import util.Random
 
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
 
 /**
  * This class tests the BlockManager and MemoryStore for thread safety and
@@ -97,7 +97,8 @@ private[spark] object ThreadingTest {
     val conf = new SparkConf()
     val serializer = new KryoSerializer(conf)
     val blockManagerMaster = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf)
     val blockManager = new BlockManager(
       "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
       new SecurityManager(conf))

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index e0555ca..6e1736f 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -17,30 +17,29 @@
 
 package org.apache.spark.ui
 
-import java.net.InetSocketAddress
-import java.net.URL
-import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}
+import java.net.{InetSocketAddress, URL}
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
 import scala.annotation.tailrec
 import scala.util.{Failure, Success, Try}
 import scala.xml.Node
 
-import org.json4s.JValue
-import org.json4s.jackson.JsonMethods.{pretty, render}
-
 import org.eclipse.jetty.server.{DispatcherType, Server}
-import org.eclipse.jetty.server.handler.HandlerList
-import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
+import org.eclipse.jetty.server.handler._
+import org.eclipse.jetty.servlet._
 import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.json4s.JValue
+import org.json4s.jackson.JsonMethods.{pretty, render}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
-
-/** Utilities for launching a web server using Jetty's HTTP Server class */
+/**
+ * Utilities for launching a web server using Jetty's HTTP Server class
+ */
 private[spark] object JettyUtils extends Logging {
+
   // Base type for a function that returns something based on an HTTP request. Allows for
   // implicit conversion from many types of functions to jetty Handlers.
-
   type Responder[T] = HttpServletRequest => T
 
   class ServletParams[T <% AnyRef](val responder: Responder[T],
@@ -57,62 +56,73 @@ private[spark] object JettyUtils extends Logging {
   implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
     new ServletParams(responder, "text/plain")
 
-  def createServlet[T <% AnyRef](servletParams: ServletParams[T],
+  def createServlet[T <% AnyRef](
+      servletParams: ServletParams[T],
       securityMgr: SecurityManager): HttpServlet = {
     new HttpServlet {
-      override def doGet(request: HttpServletRequest,
-                 response: HttpServletResponse) {
-        if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
+      override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+        if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
           response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
           response.setStatus(HttpServletResponse.SC_OK)
           val result = servletParams.responder(request)
           response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
-          response.getWriter().println(servletParams.extractFn(result))
+          response.getWriter.println(servletParams.extractFn(result))
         } else {
           response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
           response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
           response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
-            "User is not authorized to access this page.");
+            "User is not authorized to access this page.")
         }
       }
     }
   }
 
-  def createServletHandler(path: String, servlet: HttpServlet): ServletContextHandler = {
-    val contextHandler = new ServletContextHandler()
+  /** Create a context handler that responds to a request with the given path prefix */
+  def createServletHandler[T <% AnyRef](
+      path: String,
+      servletParams: ServletParams[T],
+      securityMgr: SecurityManager,
+      basePath: String = ""): ServletContextHandler = {
+    createServletHandler(path, createServlet(servletParams, securityMgr), basePath)
+  }
+
+  /** Create a context handler that responds to a request with the given path prefix */
+  def createServletHandler(
+      path: String,
+      servlet: HttpServlet,
+      basePath: String = ""): ServletContextHandler = {
+    val prefixedPath = attachPrefix(basePath, path)
+    val contextHandler = new ServletContextHandler
     val holder = new ServletHolder(servlet)
-    contextHandler.setContextPath(path)
+    contextHandler.setContextPath(prefixedPath)
     contextHandler.addServlet(holder, "/")
     contextHandler
   }
 
-  /** Creates a handler that always redirects the user to a given path */
-  def createRedirectHandler(newPath: String, path: String): ServletContextHandler = {
+  /** Create a handler that always redirects the user to the given path */
+  def createRedirectHandler(
+      srcPath: String,
+      destPath: String,
+      basePath: String = ""): ServletContextHandler = {
+    val prefixedDestPath = attachPrefix(basePath, destPath)
     val servlet = new HttpServlet {
-      override def doGet(request: HttpServletRequest,
-                 response: HttpServletResponse) {
-        // make sure we don't end up with // in the middle
-        val newUri = new URL(new URL(request.getRequestURL.toString), newPath).toURI
-        response.sendRedirect(newUri.toString)
+      override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+        // Make sure we don't end up with "//" in the middle
+        val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
+        response.sendRedirect(newUrl)
       }
     }
-    val contextHandler = new ServletContextHandler()
-    val holder = new ServletHolder(servlet)
-    contextHandler.setContextPath(path)
-    contextHandler.addServlet(holder, "/")
-    contextHandler
+    createServletHandler(srcPath, servlet, basePath)
   }
 
-  /** Creates a handler for serving files from a static directory */
+  /** Create a handler for serving files from a static directory */
   def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
-    val contextHandler = new ServletContextHandler()
+    val contextHandler = new ServletContextHandler
     val staticHandler = new DefaultServlet
     val holder = new ServletHolder(staticHandler)
     Option(getClass.getClassLoader.getResource(resourceBase)) match {
       case Some(res) =>
         holder.setInitParameter("resourceBase", res.toString)
-        holder.setInitParameter("welcomeServlets", "false")
-        holder.setInitParameter("pathInfoOnly", "false")
       case None =>
         throw new Exception("Could not find resource path for Web UI: " + resourceBase)
     }
@@ -121,6 +131,7 @@ private[spark] object JettyUtils extends Logging {
     contextHandler
   }
 
+  /** Add security filters, if any, do the given list of ServletContextHandlers */
   private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
     val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
     filters.foreach {
@@ -129,7 +140,7 @@ private[spark] object JettyUtils extends Logging {
           logInfo("Adding filter: " + filter)
           val holder : FilterHolder = new FilterHolder()
           holder.setClassName(filter)
-          // get any parameters for each filter
+          // Get any parameters for each filter
           val paramName = "spark." + filter + ".params"
           val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
           params.foreach {
@@ -147,18 +158,21 @@ private[spark] object JettyUtils extends Logging {
   }
 
   /**
-   * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied
-   * handlers.
+   * Attempt to start a Jetty server bound to the supplied hostName:port using the given
+   * context handlers.
    *
-   * If the desired port number is contented, continues incrementing ports until a free port is
-   * found. Returns the chosen port and the jetty Server object.
+   * If the desired port number is contended, continues incrementing ports until a free port is
+   * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers.
    */
-  def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler],
-      conf: SparkConf): (Server, Int) = {
-
+  def startJettyServer(
+      hostName: String,
+      port: Int,
+      handlers: Seq[ServletContextHandler],
+      conf: SparkConf): ServerInfo = {
+
+    val collection = new ContextHandlerCollection
+    collection.setHandlers(handlers.toArray)
     addFilters(handlers, conf)
-    val handlerList = new HandlerList
-    handlerList.setHandlers(handlers.toArray)
 
     @tailrec
     def connect(currentPort: Int): (Server, Int) = {
@@ -166,7 +180,7 @@ private[spark] object JettyUtils extends Logging {
       val pool = new QueuedThreadPool
       pool.setDaemon(true)
       server.setThreadPool(pool)
-      server.setHandler(handlerList)
+      server.setHandler(collection)
 
       Try {
         server.start()
@@ -181,6 +195,17 @@ private[spark] object JettyUtils extends Logging {
       }
     }
 
-    connect(port)
+    val (server, boundPort) = connect(port)
+    ServerInfo(server, boundPort, collection)
+  }
+
+  /** Attach a prefix to the given path, but avoid returning an empty path */
+  private def attachPrefix(basePath: String, relativePath: String): String = {
+    if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
   }
 }
+
+private[spark] case class ServerInfo(
+    server: Server,
+    boundPort: Int,
+    rootHandler: ContextHandlerCollection)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 5f0dee6..fd638c8 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.ui
 
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.Server
 import org.eclipse.jetty.servlet.ServletContextHandler
 
-import org.apache.spark.{Logging, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.env.EnvironmentUI
 import org.apache.spark.ui.exec.ExecutorsUI
@@ -31,34 +30,57 @@ import org.apache.spark.ui.storage.BlockManagerUI
 import org.apache.spark.util.Utils
 
 /** Top level user interface for Spark */
-private[spark] class SparkUI(sc: SparkContext) extends Logging {
-  val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
-  val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
-  var boundPort: Option[Int] = None
-  var server: Option[Server] = None
-
-  val handlers = Seq[ServletContextHandler] (
-    createStaticHandler(SparkUI.STATIC_RESOURCE_DIR + "/static", "/static"),
-    createRedirectHandler("/stages", "/")
-  )
-  val storage = new BlockManagerUI(sc)
-  val jobs = new JobProgressUI(sc)
-  val env = new EnvironmentUI(sc)
-  val exec = new ExecutorsUI(sc)
-
-  // Add MetricsServlet handlers by default
-  val metricsServletHandlers = SparkEnv.get.metricsSystem.getServletHandlers
-
-  val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
-    exec.getHandlers ++ metricsServletHandlers ++ handlers
+private[spark] class SparkUI(
+    val sc: SparkContext,
+    conf: SparkConf,
+    val listenerBus: SparkListenerBus,
+    val appName: String,
+    val basePath: String = "")
+  extends Logging {
+
+  def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
+  def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
+    this(null, conf, listenerBus, appName, basePath)
+
+  // If SparkContext is not provided, assume the associated application is not live
+  val live = sc != null
+
+  val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
+
+  private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
+  private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
+  private var serverInfo: Option[ServerInfo] = None
+
+  private val storage = new BlockManagerUI(this)
+  private val jobs = new JobProgressUI(this)
+  private val env = new EnvironmentUI(this)
+  private val exec = new ExecutorsUI(this)
+
+  val handlers: Seq[ServletContextHandler] = {
+    val metricsServletHandlers = if (live) {
+      SparkEnv.get.metricsSystem.getServletHandlers
+    } else {
+      Array[ServletContextHandler]()
+    }
+    storage.getHandlers ++
+    jobs.getHandlers ++
+    env.getHandlers ++
+    exec.getHandlers ++
+    metricsServletHandlers ++
+    Seq[ServletContextHandler] (
+      createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
+      createRedirectHandler("/", "/stages", basePath)
+    )
+  }
+
+  // Maintain executor storage status through Spark events
+  val storageStatusListener = new StorageStatusListener
 
   /** Bind the HTTP server which backs this web interface */
   def bind() {
     try {
-      val (srv, usedPort) = JettyUtils.startJettyServer(host, port, allHandlers, sc.conf)
-      logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
-      server = Some(srv)
-      boundPort = Some(usedPort)
+      serverInfo = Some(startJettyServer(host, port, handlers, sc.conf))
+      logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
         logError("Failed to create Spark JettyUtils", e)
@@ -66,25 +88,34 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
     }
   }
 
+  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
   /** Initialize all components of the server */
   def start() {
-    // NOTE: This is decoupled from bind() because of the following dependency cycle:
-    //  DAGScheduler() requires that the port of this server is known
-    //  This server must register all handlers, including JobProgressUI, before binding
-    //  JobProgressUI registers a listener with SparkContext, which requires sc to initialize
+    storage.start()
     jobs.start()
+    env.start()
     exec.start()
+
+    // Storage status listener must receive events first, as other listeners depend on its state
+    listenerBus.addListener(storageStatusListener)
+    listenerBus.addListener(storage.listener)
+    listenerBus.addListener(jobs.listener)
+    listenerBus.addListener(env.listener)
+    listenerBus.addListener(exec.listener)
   }
 
   def stop() {
-    server.foreach(_.stop())
+    assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!")
+    serverInfo.get.server.stop()
+    logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
   }
 
-  private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+  private[spark] def appUIAddress = "http://" + host + ":" + boundPort
 
 }
 
 private[spark] object SparkUI {
   val DEFAULT_PORT = "4040"
-  val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
+  val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 547a194..a487924 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -19,38 +19,43 @@ package org.apache.spark.ui
 
 import scala.xml.Node
 
-import org.apache.spark.SparkContext
-
 /** Utility functions for generating XML pages with spark content. */
 private[spark] object UIUtils {
+
   import Page._
 
   // Yarn has to go through a proxy so the base uri is provided and has to be on all links
   private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
     getOrElse("")
 
-  def prependBaseUri(resource: String = "") = uiRoot + resource
+  def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
 
   /** Returns a spark page with correctly formatted headers */
-  def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
-  : Seq[Node] = {
+  def headerSparkPage(
+      content: => Seq[Node],
+      basePath: String,
+      appName: String,
+      title: String,
+      page: Page.Value) : Seq[Node] = {
     val jobs = page match {
-      case Stages => <li class="active"><a href={prependBaseUri("/stages")}>Stages</a></li>
-      case _ => <li><a href={prependBaseUri("/stages")}>Stages</a></li>
+      case Stages =>
+        <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
+      case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
     }
     val storage = page match {
-      case Storage => <li class="active"><a href={prependBaseUri("/storage")}>Storage</a></li>
-      case _ => <li><a href={prependBaseUri("/storage")}>Storage</a></li>
+      case Storage =>
+        <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
+      case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
     }
     val environment = page match {
       case Environment => 
-        <li class="active"><a href={prependBaseUri("/environment")}>Environment</a></li>
-      case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
+        <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
+      case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
     }
     val executors = page match {
       case Executors =>
-        <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
-      case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
+        <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+      case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
     }
 
     <html>
@@ -58,14 +63,15 @@ private[spark] object UIUtils {
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
         <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
               type="text/css" />
-        <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}  type="text/css" />
+        <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}
+              type="text/css" />
         <script src={prependBaseUri("/static/sorttable.js")} ></script>
-        <title>{sc.appName} - {title}</title>
+        <title>{appName} - {title}</title>
       </head>
       <body>
         <div class="navbar navbar-static-top">
           <div class="navbar-inner">
-            <a href={prependBaseUri("/")} class="brand">
+            <a href={prependBaseUri(basePath, "/")} class="brand">
               <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
             </a>
             <ul class="nav">
@@ -74,7 +80,7 @@ private[spark] object UIUtils {
               {environment}
               {executors}
             </ul>
-            <p class="navbar-text pull-right"><strong>{sc.appName}</strong> application UI</p>
+            <p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
           </div>
         </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 1433347..23e90c3 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -19,76 +19,74 @@ package org.apache.spark.ui.env
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.JavaConversions._
-import scala.util.Properties
 import scala.xml.Node
 
 import org.eclipse.jetty.servlet.ServletContextHandler
 
-import org.apache.spark.SparkContext
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.Page.Environment
-import org.apache.spark.ui.UIUtils
 
-private[spark] class EnvironmentUI(sc: SparkContext) {
+private[ui] class EnvironmentUI(parent: SparkUI) {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private var _listener: Option[EnvironmentListener] = None
+
+  lazy val listener = _listener.get
+
+  def start() {
+    _listener = Some(new EnvironmentListener)
+  }
 
   def getHandlers = Seq[ServletContextHandler](
     createServletHandler("/environment",
-      createServlet((request: HttpServletRequest) => envDetails(request), sc.env.securityManager))
+      (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
   )
 
-  def envDetails(request: HttpServletRequest): Seq[Node] = {
-    val jvmInformation = Seq(
-      ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
-      ("Java Home", Properties.javaHome),
-      ("Scala Version", Properties.versionString),
-      ("Scala Home", Properties.scalaHome)
-    ).sorted
-    def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-    def jvmTable =
-      UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation, fixedWidth = true)
-
-    val sparkProperties = sc.conf.getAll.sorted
-
-    val systemProperties = System.getProperties.iterator.toSeq
-    val classPathProperty = systemProperties.find { case (k, v) =>
-      k == "java.class.path"
-    }.getOrElse(("", ""))
-    val otherProperties = systemProperties.filter { case (k, v) =>
-      k != "java.class.path" && !k.startsWith("spark.")
-    }.sorted
-
-    val propertyHeaders = Seq("Name", "Value")
-    def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-    val sparkPropertyTable =
-      UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties, fixedWidth = true)
-    val otherPropertyTable =
-      UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
-
-    val classPathEntries = classPathProperty._2
-        .split(sc.conf.get("path.separator", ":"))
-        .filterNot(e => e.isEmpty)
-        .map(e => (e, "System Classpath"))
-    val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
-    val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
-    val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
-
-    val classPathHeaders = Seq("Resource", "Source")
-    def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
-    val classPathTable =
-      UIUtils.listingTable(classPathHeaders, classPathRow, classPath, fixedWidth = true)
-
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val runtimeInformationTable = UIUtils.listingTable(
+      propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
+    val sparkPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
+    val systemPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
+    val classpathEntriesTable = UIUtils.listingTable(
+      classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
     val content =
       <span>
-        <h4>Runtime Information</h4> {jvmTable}
-        <h4>Spark Properties</h4>
-        {sparkPropertyTable}
-        <h4>System Properties</h4>
-        {otherPropertyTable}
-        <h4>Classpath Entries</h4>
-        {classPathTable}
+        <h4>Runtime Information</h4> {runtimeInformationTable}
+        <h4>Spark Properties</h4> {sparkPropertiesTable}
+        <h4>System Properties</h4> {systemPropertiesTable}
+        <h4>Classpath Entries</h4> {classpathEntriesTable}
       </span>
 
-    UIUtils.headerSparkPage(content, sc, "Environment", Environment)
+    UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
+  }
+
+  private def propertyHeader = Seq("Name", "Value")
+  private def classPathHeaders = Seq("Resource", "Source")
+  private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentUI
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+  var jvmInformation = Seq[(String, String)]()
+  var sparkProperties = Seq[(String, String)]()
+  var systemProperties = Seq[(String, String)]()
+  var classpathEntries = Seq[(String, String)]()
+
+  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+    synchronized {
+      val environmentDetails = environmentUpdate.environmentDetails
+      jvmInformation = environmentDetails("JVM Information")
+      sparkProperties = environmentDetails("Spark Properties")
+      systemProperties = environmentDetails("System Properties")
+      classpathEntries = environmentDetails("Classpath Entries")
+    }
   }
 }