You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:15 UTC

[6/6] git commit: [SPARK-1132] Persisting Web UI through refactoring the SparkListener interface

[SPARK-1132] Persisting Web UI through refactoring the SparkListener interface

The fleeting nature of the Spark Web UI has long been a problem reported by many users: The existing Web UI disappears as soon as the associated application terminates. This is because SparkUI is tightly coupled with SparkContext, and cannot be instantiated independently from it. To solve this, some state must be saved to persistent storage while the application is still running.

The approach taken by this PR involves persisting the UI state through SparkListenerEvents. This requires a major refactor of the SparkListener interface because existing events (1) maintain deep references, making de/serialization is difficult, and (2) do not encode all the information displayed on the UI. In this design, each existing listener for the UI (e.g. ExecutorsListener) maintains state that can be fully constructed from SparkListenerEvents. This state is then supplied to the parent UI (e.g. ExecutorsUI), which renders the associated page(s) on demand.

This PR introduces two important classes: the **EventLoggingListener**, and the **ReplayListenerBus**. In a live application, SparkUI registers an EventLoggingListener with the SparkContext in addition to the existing listeners. Over the course of the application, this listener serializes and logs all events to persisted storage. Then, after the application has finished, the SparkUI can be revived by replaying all the logged events to the existing UI listeners through the ReplayListenerBus.

This feature is currently integrated with the Master Web UI, which optionally rebuilds a SparkUI from event logs as soon as the corresponding application finishes.

More details can be found in the commit messages, comments within the code, and the [design doc](https://spark-project.atlassian.net/secure/attachment/12900/PersistingSparkWebUI.pdf). Comments and feedback are most welcome.

Author: Andrew Or <an...@gmail.com>
Author: andrewor14 <an...@gmail.com>

Closes #42 from andrewor14/master and squashes the following commits:

e5f14fa [Andrew Or] Merge github.com:apache/spark
a1c5cd9 [Andrew Or] Merge github.com:apache/spark
b8ba817 [Andrew Or] Remove UI from map when removing application in Master
83af656 [Andrew Or] Scraps and pieces (no functionality change)
222adcd [Andrew Or] Merge github.com:apache/spark
124429f [Andrew Or] Clarify LiveListenerBus behavior + Add tests for new behavior
f80bd31 [Andrew Or] Simplify static handler and BlockManager status update logic
9e14f97 [Andrew Or] Moved around functionality + renamed classes per Patrick
6740e49 [Andrew Or] Fix comment nits
650eb12 [Andrew Or] Add unit tests + Fix bugs found through tests
45fd84c [Andrew Or] Remove now deprecated test
c5c2c8f [Andrew Or] Remove list of (TaskInfo, TaskMetrics) from StageInfo
3456090 [Andrew Or] Address Patrick's comments
bf80e3d [Andrew Or] Imports, comments, and code formatting, once again (minor)
ac69ec8 [Andrew Or] Fix test fail
d801d11 [Andrew Or] Merge github.com:apache/spark (major)
dc93915 [Andrew Or] Imports, comments, and code formatting (minor)
77ba283 [Andrew Or] Address Kay's and Patrick's comments
b6eaea7 [Andrew Or] Treating SparkUI as a handler of MasterUI
d59da5f [Andrew Or] Avoid logging all the blocks on each executor
d6e3b4a [Andrew Or] Merge github.com:apache/spark
ca258a4 [Andrew Or] Master UI - add support for reading compressed event logs
176e68e [Andrew Or] Fix deprecated message for JavaSparkContext (minor)
4f69c4a [Andrew Or] Master UI - Rebuild SparkUI on application finish
291b2be [Andrew Or] Correct directory in log message "INFO: Logging events to <dir>"
1ba3407 [Andrew Or] Add a few configurable options to event logging
e375431 [Andrew Or] Add new constructors for SparkUI
18b256d [Andrew Or] Refactor out event logging and replaying logic from UI
bb4c503 [Andrew Or] Use a more mnemonic path for logging
aef411c [Andrew Or] Fix bug: storage status was not reflected on UI in the local case
03eda0b [Andrew Or] Fix HDFS flush behavior
36b3e5d [Andrew Or] Add HDFS support for event logging
cceff2b [andrewor14] Fix 100 char format fail
2fee310 [Andrew Or] Address Patrick's comments
2981d61 [Andrew Or] Move SparkListenerBus out of DAGScheduler + Clean up
5d2cec1 [Andrew Or] JobLogger: ID -> Id
0503e4b [Andrew Or] Fix PySpark tests + remove sc.clearFiles/clearJars
4d2fb0c [Andrew Or] Fix format fail
faa113e [Andrew Or] General clean up
d47585f [Andrew Or] Clean up FileLogger
472fd8a [Andrew Or] Fix a couple of tests
996d7a2 [Andrew Or] Reflect RDD unpersist on UI
7b2f811 [Andrew Or] Guard against TaskMetrics NPE + Fix tests
d1f4285 [Andrew Or] Migrate from lift-json to json4s-jackson
28019ca [Andrew Or] Merge github.com:apache/spark
bbe3501 [Andrew Or] Embed storage status and RDD info in Task events
6631c02 [Andrew Or] More formatting changes, this time mainly for Json DSL
70e7e7a [Andrew Or] Formatting changes
e9e1c6d [Andrew Or] Move all JSON de/serialization logic to JsonProtocol
d646df6 [Andrew Or] Completely decouple SparkUI from SparkContext
6814da0 [Andrew Or] Explicitly register each UI listener rather than through some magic
64d2ce1 [Andrew Or] Fix BlockManagerUI bug by introducing new event
4273013 [Andrew Or] Add a gateway SparkListener to simplify event logging
904c729 [Andrew Or] Fix another major bug
5ac906d [Andrew Or] Mostly naming, formatting, and code style changes
3fd584e [Andrew Or] Fix two major bugs
f3fc13b [Andrew Or] General refactor
4dfcd22 [Andrew Or] Merge git://git.apache.org/incubator-spark into persist-ui
b3976b0 [Andrew Or] Add functionality of reconstructing a persisted UI from SparkContext
8add36b [Andrew Or] JobProgressUI: Add JSON functionality
d859efc [Andrew Or] BlockManagerUI: Add JSON functionality
c4cd480 [Andrew Or] Also deserialize new events
8a2ebe6 [Andrew Or] Fix bugs for EnvironmentUI and ExecutorsUI
de8a1cd [Andrew Or] Serialize events both to and from JSON (rather than just to)
bf0b2e9 [Andrew Or] ExecutorUI: Serialize events rather than arbitary executor information
bb222b9 [Andrew Or] ExecutorUI: render completely from JSON
dcbd312 [Andrew Or] Add JSON Serializability for all SparkListenerEvent's
10ed49d [Andrew Or] Merge github.com:apache/incubator-spark into persist-ui
8e09306 [Andrew Or] Use JSON for ExecutorsUI
e3ae35f [Andrew Or] Merge github.com:apache/incubator-spark
3ddeb7e [Andrew Or] Also privatize fields
090544a [Andrew Or] Privatize methods
13920c9 [Andrew Or] Update docs
bd5a1d7 [Andrew Or] Typo: phyiscal -> physical
287ef44 [Andrew Or] Avoid reading the entire batch into memory; also simplify streaming logic
3df7005 [Andrew Or] Merge branch 'master' of github.com:andrewor14/incubator-spark
a531d2e [Andrew Or] Relax assumptions on compressors and serializers when batching
164489d [Andrew Or] Relax assumptions on compressors and serializers when batching


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79d07d66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79d07d66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79d07d66

Branch: refs/heads/master
Commit: 79d07d66040f206708e14de393ab0b80020ed96a
Parents: ab747d3
Author: Andrew Or <an...@gmail.com>
Authored: Wed Mar 19 13:17:01 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 19 13:17:01 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |   2 -
 .../scala/org/apache/spark/CacheManager.scala   |  76 +-
 .../org/apache/spark/SecurityManager.scala      |   8 +-
 .../scala/org/apache/spark/SparkContext.scala   |  93 ++-
 .../main/scala/org/apache/spark/SparkEnv.scala  |  80 ++-
 .../scala/org/apache/spark/TaskEndReason.scala  |   3 +-
 .../spark/api/java/JavaSparkContext.scala       |   2 +
 .../spark/deploy/ApplicationDescription.scala   |   5 +-
 .../org/apache/spark/deploy/JsonProtocol.scala  |   1 -
 .../spark/deploy/master/ApplicationInfo.scala   |   6 -
 .../org/apache/spark/deploy/master/Master.scala |  73 +-
 .../deploy/master/ui/ApplicationPage.scala      |   3 +-
 .../spark/deploy/master/ui/IndexPage.scala      |   3 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |  84 ++-
 .../org/apache/spark/deploy/worker/Worker.scala |  18 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  84 +--
 .../org/apache/spark/executor/Executor.scala    |   8 +-
 .../org/apache/spark/executor/TaskMetrics.scala |  13 +-
 .../org/apache/spark/io/CompressionCodec.scala  |   5 +-
 .../spark/metrics/sink/MetricsServlet.scala     |   8 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 306 ++++----
 .../spark/scheduler/DAGSchedulerEvent.scala     |   2 +-
 .../spark/scheduler/DAGSchedulerSource.scala    |   6 +-
 .../spark/scheduler/EventLoggingListener.scala  |  98 +++
 .../org/apache/spark/scheduler/JobLogger.scala  | 266 ++-----
 .../org/apache/spark/scheduler/JobResult.scala  |   5 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |   2 +-
 .../spark/scheduler/LiveListenerBus.scala       | 101 +++
 .../scala/org/apache/spark/scheduler/Pool.scala |   2 +-
 .../spark/scheduler/ReplayListenerBus.scala     | 104 +++
 .../apache/spark/scheduler/SparkListener.scala  | 176 +++--
 .../spark/scheduler/SparkListenerBus.scala      | 167 ++---
 .../org/apache/spark/scheduler/StageInfo.scala  |  29 +-
 .../apache/spark/scheduler/TaskLocality.scala   |   2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   6 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   9 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   5 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   5 +-
 .../org/apache/spark/storage/BlockManager.scala | 241 ++++---
 .../apache/spark/storage/BlockManagerId.scala   |   3 +-
 .../spark/storage/BlockManagerMaster.scala      |   6 +-
 .../spark/storage/BlockManagerMasterActor.scala | 160 ++---
 .../org/apache/spark/storage/MemoryStore.scala  | 100 +--
 .../org/apache/spark/storage/PutResult.scala    |  12 +-
 .../spark/storage/StorageStatusListener.scala   |  94 +++
 .../org/apache/spark/storage/StorageUtils.scala | 119 ++--
 .../apache/spark/storage/ThreadingTest.scala    |   7 +-
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 121 ++--
 .../scala/org/apache/spark/ui/SparkUI.scala     | 101 ++-
 .../scala/org/apache/spark/ui/UIUtils.scala     |  40 +-
 .../org/apache/spark/ui/env/EnvironmentUI.scala | 108 ++-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 224 +++---
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |   2 +-
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  48 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |  90 +--
 .../spark/ui/jobs/JobProgressListener.scala     | 277 +++++---
 .../apache/spark/ui/jobs/JobProgressUI.scala    |  32 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala     |  29 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |  25 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 248 +++----
 .../org/apache/spark/ui/jobs/StageTable.scala   |  75 +-
 .../spark/ui/storage/BlockManagerUI.scala       |  74 +-
 .../org/apache/spark/ui/storage/IndexPage.scala |  41 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |  86 ++-
 .../org/apache/spark/util/FileLogger.scala      | 165 +++++
 .../org/apache/spark/util/JsonProtocol.scala    | 710 +++++++++++++++++++
 .../scala/org/apache/spark/util/Utils.scala     |  38 +-
 .../java/org/apache/spark/JavaAPISuite.java     |  23 +-
 .../org/apache/spark/CacheManagerSuite.scala    |  11 +-
 .../org/apache/spark/JobCancellationSuite.scala |   8 +-
 .../scala/org/apache/spark/PipedRDDSuite.scala  |   5 +-
 .../SparkContextSchedulerCreationSuite.scala    |   4 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |  11 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  15 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala | 126 ----
 .../spark/scheduler/SparkListenerSuite.scala    | 150 ++--
 .../scheduler/TaskSchedulerImplSuite.scala      |   8 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |   8 +-
 .../spark/storage/BlockManagerSuite.scala       |  69 +-
 .../scala/org/apache/spark/ui/UISuite.scala     |  22 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  20 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 559 +++++++++++++++
 docs/configuration.md                           |  25 +-
 84 files changed, 4268 insertions(+), 1944 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c4579cf..ceead59 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import scala.{Option, deprecated}
-
 import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 872e892..c7893f2 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -20,11 +20,12 @@ package org.apache.spark
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}
 
-/** Spark class responsible for passing RDDs split contents to the BlockManager and making
-    sure a node doesn't load two copies of an RDD at once.
-  */
+/**
+ * Spark class responsible for passing RDDs split contents to the BlockManager and making
+ * sure a node doesn't load two copies of an RDD at once.
+ */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
 
   /** Keys of RDD splits that are being computed/loaded. */
@@ -49,11 +50,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
               try {loading.wait()} catch {case _ : Throwable =>}
             }
             logInfo("Finished waiting for %s".format(key))
-            // See whether someone else has successfully loaded it. The main way this would fail
-            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
-            // partition but we didn't want to make space for it. However, that case is unlikely
-            // because it's unlikely that two threads would work on the same RDD partition. One
-            // downside of the current code is that threads wait serially if this does happen.
+            /* See whether someone else has successfully loaded it. The main way this would fail
+             * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
+             * partition but we didn't want to make space for it. However, that case is unlikely
+             * because it's unlikely that two threads would work on the same RDD partition. One
+             * downside of the current code is that threads wait serially if this does happen. */
             blockManager.get(key) match {
               case Some(values) =>
                 return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
@@ -69,32 +70,45 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           // If we got here, we have to load the split
           logInfo("Partition %s not found, computing it".format(key))
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
+
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }
-          if (storageLevel.useDisk && !storageLevel.useMemory) {
-            // In the case that this RDD is to be persisted using DISK_ONLY
-            // the iterator will be passed directly to the blockManager (rather then
-            // caching it to an ArrayBuffer first), then the resulting block data iterator
-            // will be passed back to the user. If the iterator generates a lot of data,
-            // this means that it doesn't all have to be held in memory at one time.
-            // This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
-            // blocks aren't dropped by the block store before enabling that.
-            blockManager.put(key, computedValues, storageLevel, tellMaster = true)
-            return blockManager.get(key) match {
-              case Some(values) =>
-                return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
-              case None =>
-                logInfo("Failure to store %s".format(key))
-                throw new Exception("Block manager failed to return persisted valued")
+
+          // Keep track of blocks with updated statuses
+          var updatedBlocks = Seq[(BlockId, BlockStatus)]()
+          val returnValue: Iterator[T] = {
+            if (storageLevel.useDisk && !storageLevel.useMemory) {
+              /* In the case that this RDD is to be persisted using DISK_ONLY
+               * the iterator will be passed directly to the blockManager (rather then
+               * caching it to an ArrayBuffer first), then the resulting block data iterator
+               * will be passed back to the user. If the iterator generates a lot of data,
+               * this means that it doesn't all have to be held in memory at one time.
+               * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
+               * blocks aren't dropped by the block store before enabling that. */
+              updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
+              blockManager.get(key) match {
+                case Some(values) =>
+                  new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+                case None =>
+                  logInfo("Failure to store %s".format(key))
+                  throw new Exception("Block manager failed to return persisted valued")
+              }
+            } else {
+              // In this case the RDD is cached to an array buffer. This will save the results
+              // if we're dealing with a 'one-time' iterator
+              val elements = new ArrayBuffer[Any]
+              elements ++= computedValues
+              updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
+              elements.iterator.asInstanceOf[Iterator[T]]
             }
-          } else {
-            // In this case the RDD is cached to an array buffer. This will save the results
-            // if we're dealing with a 'one-time' iterator
-            val elements = new ArrayBuffer[Any]
-            elements ++= computedValues
-            blockManager.put(key, elements, storageLevel, tellMaster = true)
-            return elements.iterator.asInstanceOf[Iterator[T]]
           }
+
+          // Update task metrics to include any blocks whose storage status is updated
+          val metrics = context.taskMetrics
+          metrics.updatedBlocks = Some(updatedBlocks)
+
+          returnValue
+
         } finally {
           loading.synchronized {
             loading.remove(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 591978c..2237ee3 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -18,13 +18,13 @@
 package org.apache.spark
 
 import java.net.{Authenticator, PasswordAuthentication}
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.deploy.SparkHadoopUtil
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.deploy.SparkHadoopUtil
+
 /** 
  * Spark class responsible for security. 
  * 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 852ed8f..a1003b7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -132,6 +132,9 @@ class SparkContext(
 
   if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 
+  // An asynchronous listener bus for Spark events
+  private[spark] val listenerBus = new LiveListenerBus
+
   // Create the Spark execution environment (cache, map output tracker, etc)
   private[spark] val env = SparkEnv.create(
     conf,
@@ -139,7 +142,8 @@ class SparkContext(
     conf.get("spark.driver.host"),
     conf.get("spark.driver.port").toInt,
     isDriver = true,
-    isLocal = isLocal)
+    isLocal = isLocal,
+    listenerBus = listenerBus)
   SparkEnv.set(env)
 
   // Used to store a URL for each static file/jar together with the file's local timestamp
@@ -151,9 +155,26 @@ class SparkContext(
   private[spark] val metadataCleaner =
     new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
 
-  // Initialize the Spark UI
+  // Initialize the Spark UI, registering all associated listeners
   private[spark] val ui = new SparkUI(this)
   ui.bind()
+  ui.start()
+
+  // Optionally log Spark events
+  private[spark] val eventLogger: Option[EventLoggingListener] = {
+    if (conf.getBoolean("spark.eventLog.enabled", false)) {
+      val logger = new EventLoggingListener(appName, conf)
+      listenerBus.addListener(logger)
+      Some(logger)
+    } else None
+  }
+
+  // Information needed to replay logged events, if any
+  private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
+    eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
+
+  // At this point, all relevant SparkListeners have been registered, so begin releasing events
+  listenerBus.start()
 
   val startTime = System.currentTimeMillis()
 
@@ -200,13 +221,13 @@ class SparkContext(
   executorEnvs("SPARK_USER") = sparkUser
 
   // Create and start the scheduler
-  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
+  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
   taskScheduler.start()
 
-  @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
+  @volatile private[spark] var dagScheduler = new DAGScheduler(this)
   dagScheduler.start()
 
-  ui.start()
+  postEnvironmentUpdate()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration = {
@@ -571,7 +592,6 @@ class SparkContext(
       .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
   }
 
-
   protected[spark] def checkpointFile[T: ClassTag](
       path: String
     ): RDD[T] = {
@@ -641,10 +661,11 @@ class SparkContext(
     Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
 
     logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
+    postEnvironmentUpdate()
   }
 
   def addSparkListener(listener: SparkListener) {
-    dagScheduler.addSparkListener(listener)
+    listenerBus.addListener(listener)
   }
 
   /**
@@ -671,7 +692,7 @@ class SparkContext(
    */
   def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
 
-  def getStageInfo: Map[Stage,StageInfo] = {
+  def getStageInfo: Map[Stage, StageInfo] = {
     dagScheduler.stageToInfos
   }
 
@@ -698,7 +719,7 @@ class SparkContext(
   }
 
   /**
-   *  Return current scheduling mode
+   * Return current scheduling mode
    */
   def getSchedulingMode: SchedulingMode.SchedulingMode = {
     taskScheduler.schedulingMode
@@ -708,6 +729,7 @@ class SparkContext(
    * Clear the job's list of files added by `addFile` so that they do not get downloaded to
    * any new nodes.
    */
+  @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
   def clearFiles() {
     addedFiles.clear()
   }
@@ -723,6 +745,23 @@ class SparkContext(
   }
 
   /**
+   * Register an RDD to be persisted in memory and/or disk storage
+   */
+  private[spark] def persistRDD(rdd: RDD[_]) {
+    persistentRdds(rdd.id) = rdd
+  }
+
+  /**
+   * Unpersist an RDD from memory and/or disk storage
+   */
+  private[spark] def unpersistRDD(rdd: RDD[_], blocking: Boolean = true) {
+    val rddId = rdd.id
+    env.blockManager.master.removeRdd(rddId, blocking)
+    persistentRdds.remove(rddId)
+    listenerBus.post(SparkListenerUnpersistRDD(rddId))
+  }
+
+  /**
    * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
    * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
    * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
@@ -744,7 +783,7 @@ class SparkContext(
             if (SparkHadoopUtil.get.isYarnMode() &&
                 (master == "yarn-standalone" || master == "yarn-cluster")) {
               // In order for this to work in yarn-cluster mode the user must specify the
-              // --addjars option to the client to upload the file into the distributed cache 
+              // --addjars option to the client to upload the file into the distributed cache
               // of the AM to make it show up in the current working directory.
               val fileName = new Path(uri.getPath).getName()
               try {
@@ -752,7 +791,7 @@ class SparkContext(
               } catch {
                 case e: Exception => {
                   // For now just log an error but allow to go through so spark examples work.
-                  // The spark examples don't really need the jar distributed since its also 
+                  // The spark examples don't really need the jar distributed since its also
                   // the app jar.
                   logError("Error adding jar (" + e + "), was the --addJars option used?")
                   null
@@ -773,12 +812,14 @@ class SparkContext(
         logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
       }
     }
+    postEnvironmentUpdate()
   }
 
   /**
    * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
    * any new nodes.
    */
+  @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
   def clearJars() {
     addedJars.clear()
   }
@@ -786,6 +827,7 @@ class SparkContext(
   /** Shut down the SparkContext. */
   def stop() {
     ui.stop()
+    eventLogger.foreach(_.stop())
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler
@@ -793,12 +835,10 @@ class SparkContext(
     if (dagSchedulerCopy != null) {
       metadataCleaner.cancel()
       dagSchedulerCopy.stop()
+      listenerBus.stop()
       taskScheduler = null
       // TODO: Cache.stop()?
       env.stop()
-      // Clean up locally linked files
-      clearFiles()
-      clearJars()
       SparkEnv.set(null)
       ShuffleMapTask.clearCache()
       ResultTask.clearCache()
@@ -1026,6 +1066,19 @@ class SparkContext(
   /** Register a new RDD, returning its RDD ID */
   private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
 
+  /** Post the environment update event once the task scheduler is ready */
+  private def postEnvironmentUpdate() {
+    if (taskScheduler != null) {
+      val schedulingMode = getSchedulingMode.toString
+      val addedJarPaths = addedJars.keys.toSeq
+      val addedFilePaths = addedFiles.keys.toSeq
+      val environmentDetails =
+        SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
+      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
+      listenerBus.post(environmentUpdate)
+    }
+  }
+
   /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
   private[spark] def cleanup(cleanupTime: Long) {
     persistentRdds.clearOldValues(cleanupTime)
@@ -1189,9 +1242,7 @@ object SparkContext extends Logging {
   }
 
   /** Creates a task scheduler based on a given master URL. Extracted for testing. */
-  private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
-      : TaskScheduler =
-  {
+  private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
     // Regular expression used for local[N] master format
     val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
     // Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1230,7 +1281,7 @@ object SparkContext extends Logging {
       case SPARK_REGEX(sparkUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
-        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
+        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
         scheduler.initialize(backend)
         scheduler
 
@@ -1247,7 +1298,7 @@ object SparkContext extends Logging {
         val localCluster = new LocalSparkCluster(
           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
         val masterUrls = localCluster.start()
-        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
+        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
         scheduler.initialize(backend)
         backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
           localCluster.stop()
@@ -1307,9 +1358,9 @@ object SparkContext extends Logging {
         val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
+          new CoarseMesosSchedulerBackend(scheduler, sc, url)
         } else {
-          new MesosSchedulerBackend(scheduler, sc, url, appName)
+          new MesosSchedulerBackend(scheduler, sc, url)
         }
         scheduler.initialize(backend)
         scheduler

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 774cbd6..a1af63f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.concurrent.Await
+import scala.util.Properties
 
 import akka.actor._
 import com.google.common.collect.MapMaker
@@ -26,9 +28,10 @@ import com.google.common.collect.MapMaker
 import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.{BlockManager, BlockManagerMaster, BlockManagerMasterActor}
 import org.apache.spark.network.ConnectionManager
+import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
@@ -49,11 +52,11 @@ class SparkEnv private[spark] (
     val broadcastManager: BroadcastManager,
     val blockManager: BlockManager,
     val connectionManager: ConnectionManager,
+    val securityManager: SecurityManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
     val metricsSystem: MetricsSystem,
-    val conf: SparkConf,
-    val securityManager: SecurityManager) extends Logging {
+    val conf: SparkConf) extends Logging {
 
   // A mapping of thread ID to amount of memory used for shuffle in bytes
   // All accesses should be manually synchronized
@@ -120,9 +123,16 @@ object SparkEnv extends Logging {
       hostname: String,
       port: Int,
       isDriver: Boolean,
-      isLocal: Boolean): SparkEnv = {
+      isLocal: Boolean,
+      listenerBus: LiveListenerBus = null): SparkEnv = {
+
+    // Listener bus is only used on the driver
+    if (isDriver) {
+      assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
+    }
 
     val securityManager = new SecurityManager(conf)
+
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
       securityManager = securityManager)
 
@@ -172,8 +182,9 @@ object SparkEnv extends Logging {
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",
-      new BlockManagerMasterActor(isLocal, conf)), conf)
-    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, 
+      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
+
+    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
       serializer, conf, securityManager)
 
     val connectionManager = blockManager.connectionManager
@@ -233,10 +244,63 @@ object SparkEnv extends Logging {
       broadcastManager,
       blockManager,
       connectionManager,
+      securityManager,
       httpFileServer,
       sparkFilesDir,
       metricsSystem,
-      conf,
-      securityManager)
+      conf)
+  }
+
+  /**
+   * Return a map representation of jvm information, Spark properties, system properties, and
+   * class paths. Map keys define the category, and map values represent the corresponding
+   * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
+   */
+  private[spark]
+  def environmentDetails(
+      conf: SparkConf,
+      schedulingMode: String,
+      addedJars: Seq[String],
+      addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
+
+    val jvmInformation = Seq(
+      ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
+      ("Java Home", Properties.javaHome),
+      ("Scala Version", Properties.versionString),
+      ("Scala Home", Properties.scalaHome)
+    ).sorted
+
+    // Spark properties
+    // This includes the scheduling mode whether or not it is configured (used by SparkUI)
+    val schedulerMode =
+      if (!conf.contains("spark.scheduler.mode")) {
+        Seq(("spark.scheduler.mode", schedulingMode))
+      } else {
+        Seq[(String, String)]()
+      }
+    val sparkProperties = (conf.getAll ++ schedulerMode).sorted
+
+    // System properties that are not java classpaths
+    val systemProperties = System.getProperties.iterator.toSeq
+    val otherProperties = systemProperties.filter { case (k, v) =>
+      k != "java.class.path" && !k.startsWith("spark.")
+    }.sorted
+
+    // Class paths including all added jars and files
+    val classPathProperty = systemProperties.find { case (k, v) =>
+      k == "java.class.path"
+    }.getOrElse(("", ""))
+    val classPathEntries = classPathProperty._2
+      .split(conf.get("path.separator", ":"))
+      .filterNot(e => e.isEmpty)
+      .map(e => (e, "System Classpath"))
+    val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
+    val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
+
+    Map[String, Seq[(String, String)]](
+      "JVM Information" -> jvmInformation,
+      "Spark Properties" -> sparkProperties,
+      "System Properties" -> otherProperties,
+      "Classpath Entries" -> classPaths)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 3fd6f5e..f1a753b 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -29,7 +29,7 @@ private[spark] sealed trait TaskEndReason
 
 private[spark] case object Success extends TaskEndReason
 
-private[spark] 
+private[spark]
 case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
 
 private[spark] case class FetchFailed(
@@ -65,4 +65,3 @@ private[spark] case object ExecutorLostFailure extends TaskEndReason
  * deserializing the task result.
  */
 private[spark] case object UnknownReason extends TaskEndReason
-

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8e0eab5..35508b6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -434,6 +434,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
    * any new nodes.
    */
+  @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
   def clearJars() {
     sc.clearJars()
   }
@@ -442,6 +443,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Clear the job's list of files added by `addFile` so that they do not get downloaded to
    * any new nodes.
    */
+  @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
   def clearFiles() {
     sc.clearFiles()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 449b953..15fa8a7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.deploy
 
+import org.apache.spark.scheduler.EventLoggingInfo
+
 private[spark] class ApplicationDescription(
     val name: String,
     val maxCores: Option[Int],
     val memoryPerSlave: Int,
     val command: Command,
     val sparkHome: Option[String],
-    val appUiUrl: String)
+    var appUiUrl: String,
+    val eventLogInfo: Option[EventLoggingInfo] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index cefb1ff..c4f5e29 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -43,7 +43,6 @@ private[spark] object JsonProtocol {
     ("starttime" -> obj.startTime) ~
     ("id" -> obj.id) ~
     ("name" -> obj.desc.name) ~
-    ("appuiurl" -> obj.appUiUrl) ~
     ("cores" -> obj.desc.maxCores) ~
     ("user" ->  obj.desc.user) ~
     ("memoryperslave" -> obj.desc.memoryPerSlave) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index e8867bc..46b9f4d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -31,7 +31,6 @@ private[spark] class ApplicationInfo(
     val desc: ApplicationDescription,
     val submitDate: Date,
     val driver: ActorRef,
-    val appUiUrl: String,
     defaultCores: Int)
   extends Serializable {
 
@@ -45,11 +44,6 @@ private[spark] class ApplicationInfo(
 
   init()
 
-  private def readObject(in: java.io.ObjectInputStream) : Unit = {
-    in.defaultReadObject()
-    init()
-  }
-
   private def init() {
     state = ApplicationState.WAITING
     executors = new mutable.HashMap[Int, ExecutorInfo]

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b8dfa44..1fd2114 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,10 +37,16 @@ import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.scheduler.ReplayListenerBus
+import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{AkkaUtils, Utils}
 
-private[spark] class Master(host: String, port: Int, webUiPort: Int,
+private[spark] class Master(
+    host: String,
+    port: Int,
+    webUiPort: Int,
     val securityMgr: SecurityManager) extends Actor with Logging {
+
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
   val conf = new SparkConf
@@ -64,6 +70,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
   val completedApps = new ArrayBuffer[ApplicationInfo]
   var nextAppNumber = 0
 
+  val appIdToUI = new HashMap[String, SparkUI]
+
   val drivers = new HashSet[DriverInfo]
   val completedDrivers = new ArrayBuffer[DriverInfo]
   val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
@@ -107,8 +115,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
     logInfo("Starting Spark master at " + masterUrl)
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    webUi.start()
-    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
+    webUi.bind()
+    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
 
     masterMetricsSystem.registerSource(masterSource)
@@ -141,6 +149,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
 
   override def postStop() {
     webUi.stop()
+    appIdToUI.values.foreach(_.stop())
     masterMetricsSystem.stop()
     applicationMetricsSystem.stop()
     persistenceEngine.close()
@@ -373,7 +382,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
     }
 
     case RequestWebUIPort => {
-      sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1))
+      sender ! WebUIPortResponse(webUi.boundPort)
     }
   }
 
@@ -581,8 +590,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
   def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
     val now = System.currentTimeMillis()
     val date = new Date(now)
-    new ApplicationInfo(
-      now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
+    new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
   }
 
   def registerApplication(app: ApplicationInfo): Unit = {
@@ -614,12 +622,27 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
       if (completedApps.size >= RETAINED_APPLICATIONS) {
         val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
         completedApps.take(toRemove).foreach( a => {
+          appIdToUI.remove(a.id).foreach { ui =>
+            ui.stop()
+            webUi.detachUI(ui)
+          }
           applicationMetricsSystem.removeSource(a.appSource)
         })
         completedApps.trimStart(toRemove)
       }
       completedApps += app // Remember it in our history
       waitingApps -= app
+
+      // If application events are logged, use them to rebuild the UI
+      startPersistedSparkUI(app).map { ui =>
+        app.desc.appUiUrl = ui.basePath
+        appIdToUI(app.id) = ui
+        webUi.attachUI(ui)
+      }.getOrElse {
+        // Avoid broken links if the UI is not reconstructed
+        app.desc.appUiUrl = ""
+      }
+
       for (exec <- app.executors.values) {
         exec.worker.removeExecutor(exec)
         exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
@@ -634,6 +657,36 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
     }
   }
 
+  /**
+   * Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason,
+   * return None. Otherwise return the reconstructed UI.
+   */
+  def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
+    val appName = app.desc.name
+    val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None }
+    val eventLogDir = eventLogInfo.logDir
+    val eventCompressionCodec = eventLogInfo.compressionCodec
+    val appConf = new SparkConf
+    eventCompressionCodec.foreach { codec =>
+      appConf.set("spark.eventLog.compress", "true")
+      appConf.set("spark.io.compression.codec", codec)
+    }
+    val replayerBus = new ReplayListenerBus(appConf)
+    val ui = new SparkUI(
+      appConf,
+      replayerBus,
+      "%s (finished)".format(appName),
+      "/history/%s".format(app.id))
+
+    // Do not call ui.bind() to avoid creating a new server for each application
+    ui.start()
+    val success = replayerBus.replay(eventLogDir)
+    if (!success) {
+      ui.stop()
+      None
+    } else Some(ui)
+  }
+
   /** Generate a new app ID given a app's submission date */
   def newApplicationId(submitDate: Date): String = {
     val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
@@ -717,9 +770,11 @@ private[spark] object Master {
     }
   }
 
-  def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
-      : (ActorSystem, Int, Int) =
-  {
+  def startSystemAndActor(
+      host: String,
+      port: Int,
+      webUiPort: Int,
+      conf: SparkConf): (ActorSystem, Int, Int) = {
     val securityMgr = new SecurityManager(conf)
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
       securityManager = securityMgr)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 90cad3c..cb092cb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Await
 import scala.xml.Node
 
 import akka.pattern.ask
-import javax.servlet.http.HttpServletRequest
 import org.json4s.JValue
 
 import org.apache.spark.deploy.JsonProtocol
@@ -83,7 +82,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
               </li>
               <li><strong>Submit Date:</strong> {app.submitDate}</li>
               <li><strong>State:</strong> {app.state}</li>
-              <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
+              <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
             </ul>
           </div>
         </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 3233cd9..7ec71eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Await
 import scala.xml.Node
 
 import akka.pattern.ask
-import javax.servlet.http.HttpServletRequest
 import org.json4s.JValue
 
 import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
@@ -162,7 +161,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
         <a href={"app?appId=" + app.id}>{app.id}</a>
       </td>
       <td>
-        <a href={app.appUiUrl}>{app.desc.name}</a>
+        <a href={app.desc.appUiUrl}>{app.desc.name}</a>
       </td>
       <td>
         {app.coresGranted}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 4ad1f95..bd75b2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.deploy.master.ui
 
 import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Server
+
 import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.{ServerInfo, SparkUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -32,24 +32,35 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
-  val timeout = AkkaUtils.askTimeout(master.conf)
-  val host = Utils.localHostName()
-  val port = requestedPort
-
   val masterActorRef = master.self
+  val timeout = AkkaUtils.askTimeout(master.conf)
 
-  var server: Option[Server] = None
-  var boundPort: Option[Int] = None
+  private val host = Utils.localHostName()
+  private val port = requestedPort
+  private val applicationPage = new ApplicationPage(this)
+  private val indexPage = new IndexPage(this)
+  private var serverInfo: Option[ServerInfo] = None
 
-  val applicationPage = new ApplicationPage(this)
-  val indexPage = new IndexPage(this)
+  private val handlers: Seq[ServletContextHandler] = {
+    master.masterMetricsSystem.getServletHandlers ++
+    master.applicationMetricsSystem.getServletHandlers ++
+    Seq[ServletContextHandler](
+      createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
+      createServletHandler("/app/json",
+        (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
+      createServletHandler("/app",
+        (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
+      createServletHandler("/json",
+        (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
+      createServletHandler("/",
+        (request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
+    )
+  }
 
-  def start() {
+  def bind() {
     try {
-      val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers, master.conf)
-      server = Some(srv)
-      boundPort = Some(bPort)
-      logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
+      serverInfo = Some(startJettyServer(host, port, handlers, master.conf))
+      logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
         logError("Failed to create Master JettyUtils", e)
@@ -57,27 +68,38 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
     }
   }
 
-  val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
-    master.applicationMetricsSystem.getServletHandlers
+  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
 
-  val handlers = metricsHandlers ++ Seq[ServletContextHandler](
-    createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR + "/static", "/static"),
-    createServletHandler("/app/json",
-      createServlet((request: HttpServletRequest) => applicationPage.renderJson(request),
-        master.securityMgr)),
-    createServletHandler("/app", createServlet((request: HttpServletRequest) => applicationPage
-      .render(request), master.securityMgr)),
-    createServletHandler("/json", createServlet((request: HttpServletRequest) => indexPage
-      .renderJson(request), master.securityMgr)),
-    createServletHandler("*", createServlet((request: HttpServletRequest) => indexPage.render
-      (request), master.securityMgr))
-  )
+  /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
+  def attachUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
+    val rootHandler = serverInfo.get.rootHandler
+    for (handler <- ui.handlers) {
+      rootHandler.addHandler(handler)
+      if (!handler.isStarted) {
+        handler.start()
+      }
+    }
+  }
+
+  /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
+  def detachUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
+    val rootHandler = serverInfo.get.rootHandler
+    for (handler <- ui.handlers) {
+      if (handler.isStarted) {
+        handler.stop()
+      }
+      rootHandler.removeHandler(handler)
+    }
+  }
 
   def stop() {
-    server.foreach(_.stop())
+    assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
+    serverInfo.get.server.stop()
   }
 }
 
 private[spark] object MasterWebUI {
-  val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
+  val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index afaabed..5e0fc31 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -124,7 +124,7 @@ private[spark] class Worker(
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    webUi.start()
+    webUi.bind()
     registerWithMaster()
 
     metricsSystem.registerSource(workerSource)
@@ -150,8 +150,7 @@ private[spark] class Worker(
     for (masterUrl <- masterUrls) {
       logInfo("Connecting to master " + masterUrl + "...")
       val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
-      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
-        publicAddress)
+      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
     }
   }
 
@@ -340,10 +339,15 @@ private[spark] object Worker {
     actorSystem.awaitTermination()
   }
 
-  def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
-      masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
-      : (ActorSystem, Int) =
-  {
+  def startSystemAndActor(
+      host: String,
+      port: Int,
+      webUiPort: Int,
+      cores: Int,
+      memory: Int,
+      masterUrls: Array[String],
+      workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val conf = new SparkConf
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 4e33b33..de76a5d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,12 +19,12 @@ package org.apache.spark.deploy.worker.ui
 
 import java.io.File
 import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Server
+
 import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.ui.{JettyUtils, UIUtils}
+import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -33,37 +33,35 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
-    extends Logging {
-  val timeout = AkkaUtils.askTimeout(worker.conf)
-  val host = Utils.localHostName()
-  val port = requestedPort.getOrElse(
-    worker.conf.get("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
-
-  var server: Option[Server] = None
-  var boundPort: Option[Int] = None
-
-  val indexPage = new IndexPage(this)
+  extends Logging {
 
-  val metricsHandlers = worker.metricsSystem.getServletHandlers
+  val timeout = AkkaUtils.askTimeout(worker.conf)
 
-  val handlers = metricsHandlers ++ Seq[ServletContextHandler](
-    createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE + "/static", "/static"),
-    createServletHandler("/log", createServlet((request: HttpServletRequest) => log(request),
-      worker.securityMgr)),
-    createServletHandler("/logPage", createServlet((request: HttpServletRequest) => logPage
-      (request), worker.securityMgr)),
-    createServletHandler("/json", createServlet((request: HttpServletRequest) => indexPage
-      .renderJson(request), worker.securityMgr)),
-    createServletHandler("*", createServlet((request: HttpServletRequest) => indexPage.render
-      (request), worker.securityMgr))
-  )
+  private val host = Utils.localHostName()
+  private val port = requestedPort.getOrElse(
+    worker.conf.get("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
+  private val indexPage = new IndexPage(this)
+  private var serverInfo: Option[ServerInfo] = None
+
+  private val handlers: Seq[ServletContextHandler] = {
+    worker.metricsSystem.getServletHandlers ++
+    Seq[ServletContextHandler](
+      createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"),
+      createServletHandler("/log",
+        (request: HttpServletRequest) => log(request), worker.securityMgr),
+      createServletHandler("/logPage",
+        (request: HttpServletRequest) => logPage(request), worker.securityMgr),
+      createServletHandler("/json",
+        (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr),
+      createServletHandler("/",
+        (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr)
+    )
+  }
 
-  def start() {
+  def bind() {
     try {
-      val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers, worker.conf)
-      server = Some(srv)
-      boundPort = Some(bPort)
-      logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
+      serverInfo = Some(JettyUtils.startJettyServer(host, port, handlers, worker.conf))
+      logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
         logError("Failed to create Worker JettyUtils", e)
@@ -71,7 +69,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
     }
   }
 
-  def log(request: HttpServletRequest): String = {
+  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
+  private def log(request: HttpServletRequest): String = {
     val defaultBytes = 100 * 1024
 
     val appId = Option(request.getParameter("appId"))
@@ -98,7 +98,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
     pre + Utils.offsetBytes(path, startByte, endByte)
   }
 
-  def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
+  private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
     val defaultBytes = 100 * 1024
     val appId = Option(request.getParameter("appId"))
     val executorId = Option(request.getParameter("executorId"))
@@ -119,17 +119,14 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
     val (startByte, endByte) = getByteRange(path, offset, byteLength)
     val file = new File(path)
     val logLength = file.length
-
     val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
-
     val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
-
     val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
 
     val backButton =
       if (startByte > 0) {
         <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
-          .format(params, logType, math.max(startByte-byteLength, 0), byteLength)}>
+          .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
           <button type="button" class="btn btn-default">
             Previous {Utils.bytesToString(math.min(byteLength, startByte))}
           </button>
@@ -146,7 +143,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
         <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
           format(params, logType, endByte, byteLength)}>
           <button type="button" class="btn btn-default">
-            Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
+            Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
           </button>
         </a>
       }
@@ -175,33 +172,28 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
   }
 
   /** Determine the byte range for a log or log page. */
-  def getByteRange(path: String, offset: Option[Long], byteLength: Int)
-  : (Long, Long) = {
+  private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
     val defaultBytes = 100 * 1024
     val maxBytes = 1024 * 1024
-
     val file = new File(path)
     val logLength = file.length()
-    val getOffset = offset.getOrElse(logLength-defaultBytes)
-
+    val getOffset = offset.getOrElse(logLength - defaultBytes)
     val startByte =
       if (getOffset < 0) 0L
       else if (getOffset > logLength) logLength
       else getOffset
-
     val logPageLength = math.min(byteLength, maxBytes)
-
     val endByte = math.min(startByte + logPageLength, logLength)
-
     (startByte, endByte)
   }
 
   def stop() {
-    server.foreach(_.stop())
+    assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!")
+    serverInfo.get.server.stop()
   }
 }
 
 private[spark] object WorkerWebUI {
-  val STATIC_RESOURCE_BASE = "org/apache/spark/ui"
+  val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
   val DEFAULT_PORT="8081"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2ea2ec2..8fe9b84 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -224,10 +224,10 @@ private[spark] class Executor(
 
         for (m <- task.metrics) {
           m.hostname = Utils.localHostName()
-          m.executorDeserializeTime = (taskStart - startTime).toInt
-          m.executorRunTime = (taskFinish - taskStart).toInt
+          m.executorDeserializeTime = taskStart - startTime
+          m.executorRunTime = taskFinish - taskStart
           m.jvmGCTime = gcTime - startGCTime
-          m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
+          m.resultSerializationTime = afterSerialization - beforeSerialization
         }
 
         val accumUpdates = Accumulators.values
@@ -263,7 +263,7 @@ private[spark] class Executor(
         }
 
         case t: Throwable => {
-          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
+          val serviceTime = System.currentTimeMillis() - taskStart
           val metrics = attemptedTask.flatMap(t => t.metrics)
           for (m <- metrics) {
             m.executorRunTime = serviceTime

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 760458c..88625e7 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,21 +17,23 @@
 
 package org.apache.spark.executor
 
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
 class TaskMetrics extends Serializable {
   /**
-   * Host's name the task runs on 
+   * Host's name the task runs on
    */
   var hostname: String = _
 
   /**
    * Time taken on the executor to deserialize this task
    */
-  var executorDeserializeTime: Int = _
+  var executorDeserializeTime: Long = _
 
   /**
    * Time the executor spends actually running the task (including fetching shuffle data)
    */
-  var executorRunTime: Int = _
+  var executorRunTime: Long = _
 
   /**
    * The number of bytes this task transmitted back to the driver as the TaskResult
@@ -68,6 +70,11 @@ class TaskMetrics extends Serializable {
    * here
    */
   var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
+
+  /**
+   * Storage statuses of any blocks that have been updated as a result of this task.
+   */
+  var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 }
 
 object TaskMetrics {

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 848b5c4..059e588 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -38,8 +38,7 @@ trait CompressionCodec {
 
 private[spark] object CompressionCodec {
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.get(
-      "spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
+    createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
   }
 
   def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -47,6 +46,8 @@ private[spark] object CompressionCodec {
       .getConstructor(classOf[SparkConf])
     ctor.newInstance(conf).asInstanceOf[CompressionCodec]
   }
+
+  val DEFAULT_COMPRESSION_CODEC = classOf[LZFCompressionCodec].getName
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 3110ecc..854b52c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.SecurityManager
-import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.JettyUtils._
 
 class MetricsServlet(val property: Properties, val registry: MetricRegistry,
     securityMgr: SecurityManager) extends Sink {
@@ -46,10 +46,8 @@ class MetricsServlet(val property: Properties, val registry: MetricRegistry,
     new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
 
   def getHandlers = Array[ServletContextHandler](
-    JettyUtils.createServletHandler(servletPath, 
-      JettyUtils.createServlet(
-        new JettyUtils.ServletParams(request => getMetricsSnapshot(request), "text/json"),
-        securityMgr) )
+    createServletHandler(servletPath,
+      new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
   )
 
   def getMetricsSnapshot(request: HttpServletRequest): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ddb9012..1b43040 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -137,9 +137,8 @@ abstract class RDD[T: ClassTag](
       throw new UnsupportedOperationException(
         "Cannot change storage level of an RDD after it was already assigned a level")
     }
+    sc.persistRDD(this)
     storageLevel = newLevel
-    // Register the RDD with the SparkContext
-    sc.persistentRdds(id) = this
     this
   }
 
@@ -157,8 +156,7 @@ abstract class RDD[T: ClassTag](
    */
   def unpersist(blocking: Boolean = true): RDD[T] = {
     logInfo("Removing RDD " + id + " from persistence list")
-    sc.env.blockManager.master.removeRdd(id, blocking)
-    sc.persistentRdds.remove(id)
+    sc.unpersistRDD(this, blocking)
     storageLevel = StorageLevel.NONE
     this
   }