You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/10 19:39:41 UTC

[1/2] [SPARK-1276] Add a HistoryServer to render persisted UI

Repository: spark
Updated Branches:
  refs/heads/master a74fbbbca -> 79820fe82


http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0342a8a..f75297a 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import java.util.{Properties, UUID}
+import java.util.Properties
 
 import scala.collection.Map
 
@@ -52,6 +52,8 @@ class JsonProtocolSuite extends FunSuite {
     val blockManagerRemoved = SparkListenerBlockManagerRemoved(
       BlockManagerId("Scarce", "to be counted...", 100, 200))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
+    val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
+    val applicationEnd = SparkListenerApplicationEnd(42L)
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -64,6 +66,8 @@ class JsonProtocolSuite extends FunSuite {
     testEvent(blockManagerAdded, blockManagerAddedJsonString)
     testEvent(blockManagerRemoved, blockManagerRemovedJsonString)
     testEvent(unpersistRdd, unpersistRDDJsonString)
+    testEvent(applicationStart, applicationStartJsonString)
+    testEvent(applicationEnd, applicationEndJsonString)
   }
 
   test("Dependent Classes") {
@@ -208,7 +212,13 @@ class JsonProtocolSuite extends FunSuite {
       case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
         assertEquals(e1.blockManagerId, e2.blockManagerId)
       case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
-        assert(e1.rddId === e2.rddId)
+        assert(e1.rddId == e2.rddId)
+      case (e1: SparkListenerApplicationStart, e2: SparkListenerApplicationStart) =>
+        assert(e1.appName == e2.appName)
+        assert(e1.time == e2.time)
+        assert(e1.sparkUser == e2.sparkUser)
+      case (e1: SparkListenerApplicationEnd, e2: SparkListenerApplicationEnd) =>
+        assert(e1.time == e2.time)
       case (SparkListenerShutdown, SparkListenerShutdown) =>
       case _ => fail("Events don't match in types!")
     }
@@ -553,4 +563,14 @@ class JsonProtocolSuite extends FunSuite {
       {"Event":"SparkListenerUnpersistRDD","RDD ID":12345}
     """
 
+  private val applicationStartJsonString =
+    """
+      {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42,
+      "User":"Garfield"}
+    """
+
+  private val applicationEndJsonString =
+    """
+      {"Event":"SparkListenerApplicationEnd","Timestamp":42}
+    """
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 15bfb04..4c91c3a 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -12,17 +12,77 @@ displays useful information about the application. This includes:
 
 * A list of scheduler stages and tasks
 * A summary of RDD sizes and memory usage
-* Information about the running executors
 * Environmental information.
+* Information about the running executors
 
 You can access this interface by simply opening `http://<driver-node>:4040` in a web browser.
-If multiple SparkContexts are running on the same host, they will bind to succesive ports
+If multiple SparkContexts are running on the same host, they will bind to successive ports
 beginning with 4040 (4041, 4042, etc).
 
-Spark's Standalone Mode cluster manager also has its own
-[web UI](spark-standalone.html#monitoring-and-logging). 
+Note that this information is only available for the duration of the application by default.
+To view the web UI after the fact, set `spark.eventLog.enabled` to true before starting the
+application. This configures Spark to log Spark events that encode the information displayed
+in the UI to persisted storage.
 
-Note that in both of these UIs, the tables are sortable by clicking their headers,
+## Viewing After the Fact
+
+Spark's Standalone Mode cluster manager also has its own
+[web UI](spark-standalone.html#monitoring-and-logging). If an application has logged events over
+the course of its lifetime, then the Standalone master's web UI will automatically re-render the
+application's UI after the application has finished.
+
+If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of a finished
+application through Spark's history server, provided that the application's event logs exist.
+You can start a the history server by executing:
+
+    ./sbin/start-history-server.sh <base-logging-directory>
+
+The base logging directory must be supplied, and should contain sub-directories that each
+represents an application's event logs. This creates a web interface at
+`http://<server-url>:18080` by default. The history server depends on the following variables:
+
+<table class="table">
+  <tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>SPARK_DAEMON_MEMORY</code></td>
+    <td>Memory to allocate to the history server. (default: 512m).</td>
+  </tr>
+  <tr>
+    <td><code>SPARK_DAEMON_JAVA_OPTS</code></td>
+    <td>JVM options for the history server (default: none).</td>
+  </tr>
+</table>
+
+Further, the history server can be configured as follows:
+
+<table class="table">
+  <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+  <tr>
+    <td>spark.history.updateInterval</td>
+    <td>10</td>
+    <td>
+      The period, in seconds, at which information displayed by this history server is updated.
+      Each update checks for any changes made to the event logs in persisted storage.
+    </td>
+  </tr>
+  <tr>
+    <td>spark.history.retainedApplications</td>
+    <td>250</td>
+    <td>
+      The number of application UIs to retain. If this cap is exceeded, then the oldest
+      applications will be removed.
+    </td>
+  </tr>
+  <tr>
+    <td>spark.history.ui.port</td>
+    <td>18080</td>
+    <td>
+      The port to which the web interface of the history server binds.
+    </td>
+  </tr>
+</table>
+
+Note that in all of these UIs, the tables are sortable by clicking their headers,
 making it easy to identify slow tasks, data skew, etc.
 
 # Metrics

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 3ebf288..910b31d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -116,14 +116,14 @@ trait SparkILoopInit {
     }
   }
 
- def initializeSpark() {
+  def initializeSpark() {
     intp.beQuietDuring {
       command("""
          @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext();
         """)
       command("import org.apache.spark.SparkContext._")
     }
-   echo("Spark context available as sc.")
+    echo("Spark context available as sc.")
   }
 
   // code to be executed only after the interpreter is initialized

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/sbin/start-history-server.sh
----------------------------------------------------------------------
diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh
new file mode 100755
index 0000000..4a90c68
--- /dev/null
+++ b/sbin/start-history-server.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the history server on the machine this script is executed on.
+#
+# Usage: start-history-server.sh <base-log-dir> [<web-ui-port>]
+#   Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080
+#
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./start-history-server.sh <base-log-dir>"
+  echo "Example: ./start-history-server.sh /tmp/spark-events"
+  exit
+fi
+
+LOG_DIR=$1
+
+"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR"

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/sbin/stop-history-server.sh
----------------------------------------------------------------------
diff --git a/sbin/stop-history-server.sh b/sbin/stop-history-server.sh
new file mode 100755
index 0000000..c0034ad
--- /dev/null
+++ b/sbin/stop-history-server.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the history server on the machine this script is executed on.
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.history.HistoryServer 1


[2/2] git commit: [SPARK-1276] Add a HistoryServer to render persisted UI

Posted by pw...@apache.org.
[SPARK-1276] Add a HistoryServer to render persisted UI

The new feature of event logging, introduced in #42, allows the user to persist the details of his/her Spark application to storage, and later replay these events to reconstruct an after-the-fact SparkUI.
Currently, however, a persisted UI can only be rendered through the standalone Master. This greatly limits the use case of this new feature as many people also run Spark on Yarn / Mesos.

This PR introduces a new entity called the HistoryServer, which, given a log directory, keeps track of all completed applications independently of a Spark Master. Unlike Master, the HistoryServer needs not be running while the application is still running. It is relatively light-weight in that it only maintains static information of applications and performs no scheduling.

To quickly test it out, generate event logs with ```spark.eventLog.enabled=true``` and run ```sbin/start-history-server.sh <log-dir-path>```. Your HistoryServer awaits on port 18080.

Comments and feedback are most welcome.

---

A few other changes introduced in this PR include refactoring the WebUI interface, which is beginning to have a lot of duplicate code now that we have added more functionality to it. Two new SparkListenerEvents have been introduced (SparkListenerApplicationStart/End) to keep track of application name and start/finish times. This PR also clarifies the semantics of the ReplayListenerBus introduced in #42.

A potential TODO in the future (not part of this PR) is to render live applications in addition to just completed applications. This is useful when applications fail, a condition that our current HistoryServer does not handle unless the user manually signals application completion (by creating the APPLICATION_COMPLETION file). Handling live applications becomes significantly more challenging, however, because it is now necessary to render the same SparkUI multiple times. To avoid reading the entire log every time, which is inefficient, we must handle reading the log from where we previously left off, but this becomes fairly complicated because we must deal with the arbitrary behavior of each input stream.

Author: Andrew Or <an...@gmail.com>

Closes #204 from andrewor14/master and squashes the following commits:

7b7234c [Andrew Or] Finished -> Completed
b158d98 [Andrew Or] Address Patrick's comments
69d1b41 [Andrew Or] Do not block on posting SparkListenerApplicationEnd
19d5dd0 [Andrew Or] Merge github.com:apache/spark
f7f5bf0 [Andrew Or] Make history server's web UI port a Spark configuration
2dfb494 [Andrew Or] Decouple checking for application completion from replaying
d02dbaa [Andrew Or] Expose Spark version and include it in event logs
2282300 [Andrew Or] Add documentation for the HistoryServer
567474a [Andrew Or] Merge github.com:apache/spark
6edf052 [Andrew Or] Merge github.com:apache/spark
19e1fb4 [Andrew Or] Address Thomas' comments
248cb3d [Andrew Or] Limit number of live applications + add configurability
a3598de [Andrew Or] Do not close file system with ReplayBus + fix bind address
bc46fc8 [Andrew Or] Merge github.com:apache/spark
e2f4ff9 [Andrew Or] Merge github.com:apache/spark
050419e [Andrew Or] Merge github.com:apache/spark
81b568b [Andrew Or] Fix strange error messages...
0670743 [Andrew Or] Decouple page rendering from loading files from disk
1b2f391 [Andrew Or] Minor changes
a9eae7e [Andrew Or] Merge branch 'master' of github.com:apache/spark
d5154da [Andrew Or] Styling and comments
5dbfbb4 [Andrew Or] Merge branch 'master' of github.com:apache/spark
60bc6d5 [Andrew Or] First complete implementation of HistoryServer (only for finished apps)
7584418 [Andrew Or] Report application start/end times to HistoryServer
8aac163 [Andrew Or] Add basic application table
c086bd5 [Andrew Or] Add HistoryServer and scripts ++ Refactor WebUI interface


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79820fe8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79820fe8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79820fe8

Branch: refs/heads/master
Commit: 79820fe825ed7c09d55f50503b7ab53d4585e5f7
Parents: a74fbbb
Author: Andrew Or <an...@gmail.com>
Authored: Thu Apr 10 10:39:34 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Apr 10 10:39:34 2014 -0700

----------------------------------------------------------------------
 bin/spark-class                                 |   8 +-
 bin/spark-class2.cmd                            |   7 +-
 .../scala/org/apache/spark/SparkContext.scala   |  26 +-
 .../spark/deploy/ApplicationDescription.scala   |   4 +-
 .../apache/spark/deploy/SparkUIContainer.scala  |  50 ++++
 .../spark/deploy/history/HistoryServer.scala    | 287 +++++++++++++++++++
 .../deploy/history/HistoryServerArguments.scala |  76 +++++
 .../apache/spark/deploy/history/IndexPage.scala |  82 ++++++
 .../org/apache/spark/deploy/master/Master.scala |  62 ++--
 .../spark/deploy/master/ui/MasterWebUI.scala    |  43 +--
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  22 +-
 .../scheduler/ApplicationEventListener.scala    |  50 ++++
 .../spark/scheduler/EventLoggingListener.scala  | 146 +++++++++-
 .../spark/scheduler/ReplayListenerBus.scala     |  65 ++---
 .../apache/spark/scheduler/SparkListener.scala  |  15 +
 .../spark/scheduler/SparkListenerBus.scala      |   4 +
 .../cluster/SparkDeploySchedulerBackend.scala   |   2 +-
 .../org/apache/spark/storage/FileSegment.scala  |   2 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |  49 ++--
 .../main/scala/org/apache/spark/ui/WebUI.scala  |  21 +-
 .../org/apache/spark/ui/env/EnvironmentUI.scala |   3 +-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  |   3 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |   3 +-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |   3 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala     |   3 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   3 +-
 .../spark/ui/storage/BlockManagerUI.scala       |   3 +-
 .../org/apache/spark/ui/storage/IndexPage.scala |   3 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |   3 +-
 .../org/apache/spark/util/FileLogger.scala      |  27 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  31 ++
 .../scala/org/apache/spark/util/Utils.scala     |   8 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |   2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  24 +-
 docs/monitoring.md                              |  70 ++++-
 .../org/apache/spark/repl/SparkILoopInit.scala  |   4 +-
 sbin/start-history-server.sh                    |  37 +++
 sbin/stop-history-server.sh                     |  25 ++
 38 files changed, 1075 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/bin/spark-class
----------------------------------------------------------------------
diff --git a/bin/spark-class b/bin/spark-class
index 76fde3e..1b0d309 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}
 
 SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
 
-# Add java opts and memory settings for master, worker, executors, and repl.
+# Add java opts and memory settings for master, worker, history server, executors, and repl.
 case "$1" in
-  # Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
+  # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
   'org.apache.spark.deploy.master.Master')
     OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
     OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
@@ -58,6 +58,10 @@ case "$1" in
     OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
     OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
     ;;
+  'org.apache.spark.deploy.history.HistoryServer')
+    OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
+    OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
+    ;;
 
   # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
   'org.apache.spark.executor.CoarseGrainedExecutorBackend')

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/bin/spark-class2.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index f488cfd..4302c1b 100755
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
 
 set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
 
-rem Add java opts and memory settings for master, worker, executors, and repl.
-rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
+rem Add java opts and memory settings for master, worker, history server, executors, and repl.
+rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
 if "%1"=="org.apache.spark.deploy.master.Master" (
   set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
   if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
 ) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
   set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
   if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
+) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
+  set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
+  if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
 
 rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
 ) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7630523..e6c9b70 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] val eventLogger: Option[EventLoggingListener] = {
     if (conf.getBoolean("spark.eventLog.enabled", false)) {
       val logger = new EventLoggingListener(appName, conf)
+      logger.start()
       listenerBus.addListener(logger)
       Some(logger)
     } else None
   }
 
-  // Information needed to replay logged events, if any
-  private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
-    eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
-
   // At this point, all relevant SparkListeners have been registered, so begin releasing events
   listenerBus.start()
 
@@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
   cleaner.foreach(_.start())
 
   postEnvironmentUpdate()
+  postApplicationStart()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration: Configuration = {
@@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
     listenerBus.addListener(listener)
   }
 
+  /** The version of Spark on which this application is running. */
+  def version = SparkContext.SPARK_VERSION
+
   /**
    * Return a map from the slave to the max memory available for caching and the remaining
    * memory available for caching.
@@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /** Shut down the SparkContext. */
   def stop() {
+    postApplicationEnd()
     ui.stop()
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
@@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Register a new RDD, returning its RDD ID */
   private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
 
+  /** Post the application start event */
+  private def postApplicationStart() {
+    listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
+  }
+
+  /**
+   * Post the application end event to all listeners immediately, rather than adding it
+   * to the event queue for it to be asynchronously processed eventually. Otherwise, a race
+   * condition exists in which the listeners may stop before this event has been propagated.
+   */
+  private def postApplicationEnd() {
+    listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+  }
+
   /** Post the environment update event once the task scheduler is ready */
   private def postEnvironmentUpdate() {
     if (taskScheduler != null) {
@@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
  */
 object SparkContext extends Logging {
 
+  private[spark] val SPARK_VERSION = "1.0.0"
+
   private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
 
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 15fa8a7..86305d2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.deploy
 
-import org.apache.spark.scheduler.EventLoggingInfo
-
 private[spark] class ApplicationDescription(
     val name: String,
     val maxCores: Option[Int],
@@ -26,7 +24,7 @@ private[spark] class ApplicationDescription(
     val command: Command,
     val sparkHome: Option[String],
     var appUiUrl: String,
-    val eventLogInfo: Option[EventLoggingInfo] = None)
+    val eventLogDir: Option[String] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
new file mode 100644
index 0000000..33fceae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.spark.ui.{SparkUI, WebUI}
+
+private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
+
+  /** Attach a SparkUI to this container. Only valid after bind(). */
+  def attachUI(ui: SparkUI) {
+    assert(serverInfo.isDefined,
+      "%s must be bound to a server before attaching SparkUIs".format(name))
+    val rootHandler = serverInfo.get.rootHandler
+    for (handler <- ui.handlers) {
+      rootHandler.addHandler(handler)
+      if (!handler.isStarted) {
+        handler.start()
+      }
+    }
+  }
+
+  /** Detach a SparkUI from this container. Only valid after bind(). */
+  def detachUI(ui: SparkUI) {
+    assert(serverInfo.isDefined,
+      "%s must be bound to a server before detaching SparkUIs".format(name))
+    val rootHandler = serverInfo.get.rootHandler
+    for (handler <- ui.handlers) {
+      if (handler.isStarted) {
+        handler.stop()
+      }
+      rootHandler.removeHandler(handler)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
new file mode 100644
index 0000000..97d2ba9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.eclipse.jetty.servlet.ServletContextHandler
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkUIContainer
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * A web server that renders SparkUIs of completed applications.
+ *
+ * For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
+ * main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
+ *
+ * The logging directory structure is as follows: Within the given base directory, each
+ * application's event logs are maintained in the application's own sub-directory. This
+ * is the same structure as maintained in the event log write code path in
+ * EventLoggingListener.
+ *
+ * @param baseLogDir The base directory in which event logs are found
+ */
+class HistoryServer(
+    val baseLogDir: String,
+    conf: SparkConf)
+  extends SparkUIContainer("History Server") with Logging {
+
+  import HistoryServer._
+
+  private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
+  private val localHost = Utils.localHostName()
+  private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+  private val port = WEB_UI_PORT
+  private val securityManager = new SecurityManager(conf)
+  private val indexPage = new IndexPage(this)
+
+  // A timestamp of when the disk was last accessed to check for log updates
+  private var lastLogCheckTime = -1L
+
+  // Number of completed applications found in this directory
+  private var numCompletedApplications = 0
+
+  @volatile private var stopped = false
+
+  /**
+   * A background thread that periodically checks for event log updates on disk.
+   *
+   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
+   * time at which it performs the next log check to maintain the same period as before.
+   *
+   * TODO: Add a mechanism to update manually.
+   */
+  private val logCheckingThread = new Thread {
+    override def run() {
+      while (!stopped) {
+        val now = System.currentTimeMillis
+        if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
+          checkForLogs()
+          Thread.sleep(UPDATE_INTERVAL_MS)
+        } else {
+          // If the user has manually checked for logs recently, wait until
+          // UPDATE_INTERVAL_MS after the last check time
+          Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+        }
+      }
+    }
+  }
+
+  private val handlers = Seq[ServletContextHandler](
+    createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
+    createServletHandler("/",
+      (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
+  )
+
+  // A mapping of application ID to its history information, which includes the rendered UI
+  val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
+
+  /**
+   * Start the history server.
+   *
+   * This starts a background thread that periodically synchronizes information displayed on
+   * this UI with the event logs in the provided base directory.
+   */
+  def start() {
+    logCheckingThread.start()
+  }
+
+  /** Bind to the HTTP server behind this web interface. */
+  override def bind() {
+    try {
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+      logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to bind HistoryServer", e)
+        System.exit(1)
+    }
+  }
+
+  /**
+   * Check for any updates to event logs in the base directory. This is only effective once
+   * the server has been bound.
+   *
+   * If a new completed application is found, the server renders the associated SparkUI
+   * from the application's event logs, attaches this UI to itself, and stores metadata
+   * information for this application.
+   *
+   * If the logs for an existing completed application are no longer found, the server
+   * removes all associated information and detaches the SparkUI.
+   */
+  def checkForLogs() = synchronized {
+    if (serverInfo.isDefined) {
+      lastLogCheckTime = System.currentTimeMillis
+      logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
+      try {
+        val logStatus = fileSystem.listStatus(new Path(baseLogDir))
+        val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
+        val logInfos = logDirs
+          .sortBy { dir => getModificationTime(dir) }
+          .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
+          .filter { case (dir, info) => info.applicationComplete }
+
+        // Logging information for applications that should be retained
+        val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
+        val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
+
+        // Remove any applications that should no longer be retained
+        appIdToInfo.foreach { case (appId, info) =>
+          if (!retainedAppIds.contains(appId)) {
+            detachUI(info.ui)
+            appIdToInfo.remove(appId)
+          }
+        }
+
+        // Render the application's UI if it is not already there
+        retainedLogInfos.foreach { case (dir, info) =>
+          val appId = dir.getPath.getName
+          if (!appIdToInfo.contains(appId)) {
+            renderSparkUI(dir, info)
+          }
+        }
+
+        // Track the total number of completed applications observed this round
+        numCompletedApplications = logInfos.size
+
+      } catch {
+        case t: Throwable => logError("Exception in checking for event log updates", t)
+      }
+    } else {
+      logWarning("Attempted to check for event log updates before binding the server.")
+    }
+  }
+
+  /**
+   * Render a new SparkUI from the event logs if the associated application is completed.
+   *
+   * HistoryServer looks for a special file that indicates application completion in the given
+   * directory. If this file exists, the associated application is regarded to be completed, in
+   * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
+   */
+  private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) {
+    val path = logDir.getPath
+    val appId = path.getName
+    val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
+    val ui = new SparkUI(replayBus, appId, "/history/" + appId)
+    val appListener = new ApplicationEventListener
+    replayBus.addListener(appListener)
+
+    // Do not call ui.bind() to avoid creating a new server for each application
+    ui.start()
+    replayBus.replay()
+    if (appListener.applicationStarted) {
+      attachUI(ui)
+      val appName = appListener.appName
+      val sparkUser = appListener.sparkUser
+      val startTime = appListener.startTime
+      val endTime = appListener.endTime
+      val lastUpdated = getModificationTime(logDir)
+      ui.setAppName(appName + " (completed)")
+      appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
+        lastUpdated, sparkUser, path, ui)
+    }
+  }
+
+  /** Stop the server and close the file system. */
+  override def stop() {
+    super.stop()
+    stopped = true
+    fileSystem.close()
+  }
+
+  /** Return the address of this server. */
+  def getAddress: String = "http://" + publicHost + ":" + boundPort
+
+  /** Return the number of completed applications found, whether or not the UI is rendered. */
+  def getNumApplications: Int = numCompletedApplications
+
+  /** Return when this directory was last modified. */
+  private def getModificationTime(dir: FileStatus): Long = {
+    try {
+      val logFiles = fileSystem.listStatus(dir.getPath)
+      if (logFiles != null && !logFiles.isEmpty) {
+        logFiles.map(_.getModificationTime).max
+      } else {
+        dir.getModificationTime
+      }
+    } catch {
+      case t: Throwable =>
+        logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+        -1L
+    }
+  }
+}
+
+/**
+ * The recommended way of starting and stopping a HistoryServer is through the scripts
+ * start-history-server.sh and stop-history-server.sh. The path to a base log directory
+ * is must be specified, while the requested UI port is optional. For example:
+ *
+ *   ./sbin/spark-history-server.sh /tmp/spark-events
+ *   ./sbin/spark-history-server.sh hdfs://1.2.3.4:9000/spark-events
+ *
+ * This launches the HistoryServer as a Spark daemon.
+ */
+object HistoryServer {
+  private val conf = new SparkConf
+
+  // Interval between each check for event log updates
+  val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
+
+  // How many applications to retain
+  val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
+
+  // The port to which the web UI is bound
+  val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
+
+  val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+
+  def main(argStrings: Array[String]) {
+    val args = new HistoryServerArguments(argStrings)
+    val server = new HistoryServer(args.logDir, conf)
+    server.bind()
+    server.start()
+
+    // Wait until the end of the world... or if the HistoryServer process is manually stopped
+    while(true) { Thread.sleep(Int.MaxValue) }
+    server.stop()
+  }
+}
+
+
+private[spark] case class ApplicationHistoryInfo(
+    id: String,
+    name: String,
+    startTime: Long,
+    endTime: Long,
+    lastUpdated: Long,
+    sparkUser: String,
+    logDirPath: Path,
+    ui: SparkUI) {
+  def started = startTime != -1
+  def completed = endTime != -1
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
new file mode 100644
index 0000000..943c061
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.util.Utils
+
+/**
+ * Command-line parser for the master.
+ */
+private[spark] class HistoryServerArguments(args: Array[String]) {
+  var logDir = ""
+
+  parse(args.toList)
+
+  private def parse(args: List[String]): Unit = {
+    args match {
+      case ("--dir" | "-d") :: value :: tail =>
+        logDir = value
+        parse(tail)
+
+      case ("--help" | "-h") :: tail =>
+        printUsageAndExit(0)
+
+      case Nil =>
+
+      case _ =>
+        printUsageAndExit(1)
+    }
+    validateLogDir()
+  }
+
+  private def validateLogDir() {
+    if (logDir == "") {
+      System.err.println("Logging directory must be specified.")
+      printUsageAndExit(1)
+    }
+    val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+    val path = new Path(logDir)
+    if (!fileSystem.exists(path)) {
+      System.err.println("Logging directory specified does not exist: %s".format(logDir))
+      printUsageAndExit(1)
+    }
+    if (!fileSystem.getFileStatus(path).isDir) {
+      System.err.println("Logging directory specified is not a directory: %s".format(logDir))
+      printUsageAndExit(1)
+    }
+  }
+
+  private def printUsageAndExit(exitCode: Int) {
+    System.err.println(
+      "Usage: HistoryServer [options]\n" +
+      "\n" +
+      "Options:\n" +
+      "  -d DIR,  --dir DIR     Location of event log files")
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
new file mode 100644
index 0000000..54dffff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUI}
+
+private[spark] class IndexPage(parent: HistoryServer) {
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
+    val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+    val content =
+      <div class="row-fluid">
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
+          </ul>
+          {
+            if (parent.appIdToInfo.size > 0) {
+              <h4>
+                Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
+                Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+              </h4> ++
+              appTable
+            } else {
+              <h4>No Completed Applications Found</h4>
+            }
+          }
+        </div>
+      </div>
+    UIUtils.basicSparkPage(content, "History Server")
+  }
+
+  private val appHeader = Seq(
+    "App Name",
+    "Started",
+    "Completed",
+    "Duration",
+    "Spark User",
+    "Log Directory",
+    "Last Updated")
+
+  private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+    val appName = if (info.started) info.name else info.logDirPath.getName
+    val uiAddress = parent.getAddress + info.ui.basePath
+    val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
+    val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
+    val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
+    val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
+    val sparkUser = if (info.started) info.sparkUser else "Unknown user"
+    val logDirectory = info.logDirPath.getName
+    val lastUpdated = WebUI.formatDate(info.lastUpdated)
+    <tr>
+      <td><a href={uiAddress}>{appName}</a></td>
+      <td>{startTime}</td>
+      <td>{endTime}</td>
+      <td>{duration}</td>
+      <td>{sparkUser}</td>
+      <td>{logDirectory}</td>
+      <td>{lastUpdated}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 95bd62e..2446e86 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,6 +29,7 @@ import akka.actor._
 import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
+import org.apache.hadoop.fs.FileSystem
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
@@ -37,7 +38,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.scheduler.ReplayListenerBus
+import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -45,7 +46,8 @@ private[spark] class Master(
     host: String,
     port: Int,
     webUiPort: Int,
-    val securityMgr: SecurityManager) extends Actor with Logging {
+    val securityMgr: SecurityManager)
+  extends Actor with Logging {
 
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
@@ -71,6 +73,7 @@ private[spark] class Master(
   var nextAppNumber = 0
 
   val appIdToUI = new HashMap[String, SparkUI]
+  val fileSystemsUsed = new HashSet[FileSystem]
 
   val drivers = new HashSet[DriverInfo]
   val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -149,6 +152,7 @@ private[spark] class Master(
 
   override def postStop() {
     webUi.stop()
+    fileSystemsUsed.foreach(_.close())
     masterMetricsSystem.stop()
     applicationMetricsSystem.stop()
     persistenceEngine.close()
@@ -630,11 +634,7 @@ private[spark] class Master(
       waitingApps -= app
 
       // If application events are logged, use them to rebuild the UI
-      startPersistedSparkUI(app).map { ui =>
-        app.desc.appUiUrl = ui.basePath
-        appIdToUI(app.id) = ui
-        webUi.attachUI(ui)
-      }.getOrElse {
+      if (!rebuildSparkUI(app)) {
         // Avoid broken links if the UI is not reconstructed
         app.desc.appUiUrl = ""
       }
@@ -654,30 +654,34 @@ private[spark] class Master(
   }
 
   /**
-   * Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason,
-   * return None. Otherwise return the reconstructed UI.
+   * Rebuild a new SparkUI from the given application's event logs.
+   * Return whether this is successful.
    */
-  def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
+  def rebuildSparkUI(app: ApplicationInfo): Boolean = {
     val appName = app.desc.name
-    val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None }
-    val eventLogDir = eventLogInfo.logDir
-    val eventCompressionCodec = eventLogInfo.compressionCodec
-    val appConf = new SparkConf
-    eventCompressionCodec.foreach { codec =>
-      appConf.set("spark.eventLog.compress", "true")
-      appConf.set("spark.io.compression.codec", codec)
-    }
-    val replayerBus = new ReplayListenerBus(appConf)
-    val ui = new SparkUI(
-      appConf,
-      replayerBus,
-      "%s (finished)".format(appName),
-      "/history/%s".format(app.id))
-
-    // Do not call ui.bind() to avoid creating a new server for each application
-    ui.start()
-    val success = replayerBus.replay(eventLogDir)
-    if (success) Some(ui) else None
+    val eventLogDir = app.desc.eventLogDir.getOrElse { return false }
+    val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
+    val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
+    val eventLogPaths = eventLogInfo.logPaths
+    val compressionCodec = eventLogInfo.compressionCodec
+    if (!eventLogPaths.isEmpty) {
+      try {
+        val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
+        val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
+        ui.start()
+        replayBus.replay()
+        app.desc.appUiUrl = ui.basePath
+        appIdToUI(app.id) = ui
+        webUi.attachUI(ui)
+        return true
+      } catch {
+        case t: Throwable =>
+          logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
+      }
+    } else {
+      logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
+    }
+    false
   }
 
   /** Generate a new app ID given a app's submission date */

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 01d9f52..30c8ade 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -22,8 +22,9 @@ import javax.servlet.http.HttpServletRequest
 import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkUIContainer
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.ui.{ServerInfo, SparkUI}
+import org.apache.spark.ui.SparkUI
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -31,7 +32,9 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * Web UI server for the standalone master.
  */
 private[spark]
-class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int)
+  extends SparkUIContainer("MasterWebUI") with Logging {
+
   val masterActorRef = master.self
   val timeout = AkkaUtils.askTimeout(master.conf)
 
@@ -39,7 +42,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
   private val port = requestedPort
   private val applicationPage = new ApplicationPage(this)
   private val indexPage = new IndexPage(this)
-  private var serverInfo: Option[ServerInfo] = None
 
   private val handlers: Seq[ServletContextHandler] = {
     master.masterMetricsSystem.getServletHandlers ++
@@ -57,47 +59,18 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
     )
   }
 
-  def bind() {
+  /** Bind to the HTTP server behind this web interface. */
+  override def bind() {
     try {
       serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
       logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
-        logError("Failed to create Master JettyUtils", e)
+        logError("Failed to create Master web UI", e)
         System.exit(1)
     }
   }
 
-  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
-  /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
-  def attachUI(ui: SparkUI) {
-    assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
-    val rootHandler = serverInfo.get.rootHandler
-    for (handler <- ui.handlers) {
-      rootHandler.addHandler(handler)
-      if (!handler.isStarted) {
-        handler.start()
-      }
-    }
-  }
-
-  /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
-  def detachUI(ui: SparkUI) {
-    assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
-    val rootHandler = serverInfo.get.rootHandler
-    for (handler <- ui.handlers) {
-      if (handler.isStarted) {
-        handler.stop()
-      }
-      rootHandler.removeHandler(handler)
-    }
-  }
-
-  def stop() {
-    assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
-    serverInfo.get.server.stop()
-  }
 }
 
 private[spark] object MasterWebUI {

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 650f3da..5625a44 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -33,15 +33,14 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
-  extends Logging {
+  extends WebUI("WorkerWebUI") with Logging {
 
   val timeout = AkkaUtils.askTimeout(worker.conf)
 
   private val host = Utils.localHostName()
   private val port = requestedPort.getOrElse(
-    worker.conf.get("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
+    worker.conf.getInt("worker.ui.port",  WorkerWebUI.DEFAULT_PORT))
   private val indexPage = new IndexPage(this)
-  private var serverInfo: Option[ServerInfo] = None
 
   private val handlers: Seq[ServletContextHandler] = {
     worker.metricsSystem.getServletHandlers ++
@@ -58,19 +57,18 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
     )
   }
 
-  def bind() {
+  /** Bind to the HTTP server behind this web interface. */
+  override def bind() {
     try {
-      serverInfo = Some(JettyUtils.startJettyServer("0.0.0.0", port, handlers, worker.conf))
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
       logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
-        logError("Failed to create Worker JettyUtils", e)
+        logError("Failed to create Worker web UI", e)
         System.exit(1)
     }
   }
 
-  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
   private def log(request: HttpServletRequest): String = {
     val defaultBytes = 100 * 1024
 
@@ -187,13 +185,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
     (startByte, endByte)
   }
 
-  def stop() {
-    assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!")
-    serverInfo.get.server.stop()
-  }
 }
 
 private[spark] object WorkerWebUI {
+  val DEFAULT_PORT=8081
   val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
-  val DEFAULT_PORT="8081"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
new file mode 100644
index 0000000..affda13
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A simple listener for application events.
+ *
+ * This listener expects to hear events from a single application only. If events
+ * from multiple applications are seen, the behavior is unspecified.
+ */
+private[spark] class ApplicationEventListener extends SparkListener {
+  var appName = "<Not Started>"
+  var sparkUser = "<Not Started>"
+  var startTime = -1L
+  var endTime = -1L
+
+  def applicationStarted = startTime != -1
+
+  def applicationFinished = endTime != -1
+
+  def applicationDuration: Long = {
+    val difference = endTime - startTime
+    if (applicationStarted && applicationFinished && difference > 0) difference else -1L
+  }
+
+  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
+    appName = applicationStart.appName
+    startTime = applicationStart.time
+    sparkUser = applicationStart.sparkUser
+  }
+
+  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+    endTime = applicationEnd.time
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 217f882..b983c16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.scheduler
 
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.json4s.jackson.JsonMethods._
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, FileLogger}
+import org.apache.spark.util.{FileLogger, JsonProtocol}
 
 /**
  * A SparkListener that logs events to persistent storage.
@@ -36,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger}
 private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
   extends SparkListener with Logging {
 
+  import EventLoggingListener._
+
   private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
   private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
   private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
@@ -46,17 +51,21 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
   private val logger =
     new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
 
-  // Information needed to replay the events logged by this listener later
-  val info = {
-    val compressionCodec = if (shouldCompress) {
-      Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
-    } else None
-    EventLoggingInfo(logDir, compressionCodec)
+  /**
+   * Begin logging events.
+   * If compression is used, log a file that indicates which compression library is used.
+   */
+  def start() {
+    logInfo("Logging events to %s".format(logDir))
+    if (shouldCompress) {
+      val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
+      logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
+    }
+    logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
+    logger.newFile(LOG_PREFIX + logger.fileIndex)
   }
 
-  logInfo("Logging events to %s".format(logDir))
-
-  /** Log the event as JSON */
+  /** Log the event as JSON. */
   private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
     val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
     logger.logLine(eventJson)
@@ -90,9 +99,118 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
     logEvent(event, flushLogger = true)
   override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
     logEvent(event, flushLogger = true)
+  override def onApplicationStart(event: SparkListenerApplicationStart) =
+    logEvent(event, flushLogger = true)
+  override def onApplicationEnd(event: SparkListenerApplicationEnd) =
+    logEvent(event, flushLogger = true)
+
+  /**
+   * Stop logging events.
+   * In addition, create an empty special file to indicate application completion.
+   */
+  def stop() = {
+    logger.newFile(APPLICATION_COMPLETE)
+    logger.stop()
+  }
+}
+
+private[spark] object EventLoggingListener extends Logging {
+  val LOG_PREFIX = "EVENT_LOG_"
+  val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
+  val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
+
+  // A cache for compression codecs to avoid creating the same codec many times
+  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+
+  def isEventLogFile(fileName: String): Boolean = {
+    fileName.startsWith(LOG_PREFIX)
+  }
+
+  def isSparkVersionFile(fileName: String): Boolean = {
+    fileName.startsWith(SPARK_VERSION_PREFIX)
+  }
+
+  def isCompressionCodecFile(fileName: String): Boolean = {
+    fileName.startsWith(COMPRESSION_CODEC_PREFIX)
+  }
+
+  def isApplicationCompleteFile(fileName: String): Boolean = {
+    fileName == APPLICATION_COMPLETE
+  }
+
+  def parseSparkVersion(fileName: String): String = {
+    if (isSparkVersionFile(fileName)) {
+      fileName.replaceAll(SPARK_VERSION_PREFIX, "")
+    } else ""
+  }
+
+  def parseCompressionCodec(fileName: String): String = {
+    if (isCompressionCodecFile(fileName)) {
+      fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
+    } else ""
+  }
+
+  /**
+   * Parse the event logging information associated with the logs in the given directory.
+   *
+   * Specifically, this looks for event log files, the Spark version file, the compression
+   * codec file (if event logs are compressed), and the application completion file (if the
+   * application has run to completion).
+   */
+  def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = {
+    try {
+      val fileStatuses = fileSystem.listStatus(logDir)
+      val filePaths =
+        if (fileStatuses != null) {
+          fileStatuses.filter(!_.isDir).map(_.getPath).toSeq
+        } else {
+          Seq[Path]()
+        }
+      if (filePaths.isEmpty) {
+        logWarning("No files found in logging directory %s".format(logDir))
+      }
+      EventLoggingInfo(
+        logPaths = filePaths.filter { path => isEventLogFile(path.getName) },
+        sparkVersion = filePaths
+          .find { path => isSparkVersionFile(path.getName) }
+          .map { path => parseSparkVersion(path.getName) }
+          .getOrElse("<Unknown>"),
+        compressionCodec = filePaths
+          .find { path => isCompressionCodecFile(path.getName) }
+          .map { path =>
+            val codec = EventLoggingListener.parseCompressionCodec(path.getName)
+            val conf = new SparkConf
+            conf.set("spark.io.compression.codec", codec)
+            codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf))
+          },
+        applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
+      )
+    } catch {
+      case t: Throwable =>
+        logError("Exception in parsing logging info from directory %s".format(logDir), t)
+      EventLoggingInfo.empty
+    }
+  }
 
-  def stop() = logger.stop()
+  /**
+   * Parse the event logging information associated with the logs in the given directory.
+   */
+  def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = {
+    parseLoggingInfo(new Path(logDir), fileSystem)
+  }
 }
 
-// If compression is not enabled, compressionCodec is None
-private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])
+
+/**
+ * Information needed to process the event logs associated with an application.
+ */
+private[spark] case class EventLoggingInfo(
+    logPaths: Seq[Path],
+    sparkVersion: String,
+    compressionCodec: Option[CompressionCodec],
+    applicationComplete: Boolean = false)
+
+private[spark] object EventLoggingInfo {
+  def empty = EventLoggingInfo(Seq[Path](), "<Unknown>", None, applicationComplete = false)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index db76178..b03665f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.scheduler
 
 import java.io.InputStream
-import java.net.URI
 
 import scala.io.Source
 
@@ -26,63 +25,47 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 import org.apache.hadoop.fs.{Path, FileSystem}
 import org.json4s.jackson.JsonMethods._
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.Logging
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.JsonProtocol
 
 /**
- * An EventBus that replays logged events from persisted storage
+ * A SparkListenerBus that replays logged events from persisted storage.
+ *
+ * This class expects files to be appropriately prefixed as specified in EventLoggingListener.
+ * There exists a one-to-one mapping between ReplayListenerBus and event logging applications.
  */
-private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging {
-  private val compressed = conf.getBoolean("spark.eventLog.compress", false)
+private[spark] class ReplayListenerBus(
+    logPaths: Seq[Path],
+    fileSystem: FileSystem,
+    compressionCodec: Option[CompressionCodec])
+  extends SparkListenerBus with Logging {
 
-  // Only used if compression is enabled
-  private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+  private var replayed = false
 
-  /**
-   * Return a list of paths representing log files in the given directory.
-   */
-  private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = {
-    val path = new Path(logDir)
-    if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
-      logWarning("Log path provided is not a valid directory: %s".format(logDir))
-      return Array[Path]()
-    }
-    val logStatus = fileSystem.listStatus(path)
-    if (logStatus == null || !logStatus.exists(!_.isDir)) {
-      logWarning("Log path provided contains no log files: %s".format(logDir))
-      return Array[Path]()
-    }
-    logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName)
+  if (logPaths.length == 0) {
+    logWarning("Log path provided contains no log files.")
   }
 
   /**
    * Replay each event in the order maintained in the given logs.
+   * This should only be called exactly once.
    */
-  def replay(logDir: String): Boolean = {
-    val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
-    val logPaths = getLogFilePaths(logDir, fileSystem)
-    if (logPaths.length == 0) {
-      return false
-    }
-
+  def replay() {
+    assert(!replayed, "ReplayListenerBus cannot replay events more than once")
     logPaths.foreach { path =>
       // Keep track of input streams at all levels to close them later
       // This is necessary because an exception can occur in between stream initializations
       var fileStream: Option[InputStream] = None
       var bufferedStream: Option[InputStream] = None
       var compressStream: Option[InputStream] = None
-      var currentLine = ""
+      var currentLine = "<not started>"
       try {
-        currentLine = "<not started>"
         fileStream = Some(fileSystem.open(path))
         bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
-        compressStream =
-          if (compressed) {
-            Some(compressionCodec.compressedInputStream(bufferedStream.get))
-          } else bufferedStream
+        compressStream = Some(wrapForCompression(bufferedStream.get))
 
-        // Parse each line as an event and post it to all attached listeners
+        // Parse each line as an event and post the event to all attached listeners
         val lines = Source.fromInputStream(compressStream.get).getLines()
         lines.foreach { line =>
           currentLine = line
@@ -98,7 +81,11 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus
         compressStream.foreach(_.close())
       }
     }
-    fileSystem.close()
-    true
+    replayed = true
+  }
+
+  /** If a compression codec is specified, wrap the given stream in a compression stream. */
+  private def wrapForCompression(stream: InputStream): InputStream = {
+    compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ced2035..378cf1a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -75,6 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
 @DeveloperApi
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
+case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
+  extends SparkListenerEvent
+
+case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+
 /** An event used in the listener to shutdown the listener daemon thread. */
 private[spark] case object SparkListenerShutdown extends SparkListenerEvent
 
@@ -141,6 +146,16 @@ trait SparkListener {
    * Called when an RDD is manually unpersisted by the application
    */
   def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
+
+  /**
+   * Called when the application starts
+   */
+  def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
+
+  /**
+   * Called when the application ends
+   */
+  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 729e120..d6df193 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,10 @@ private[spark] trait SparkListenerBus {
         sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
       case unpersistRDD: SparkListenerUnpersistRDD =>
         sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
+      case applicationStart: SparkListenerApplicationStart =>
+        sparkListeners.foreach(_.onApplicationStart(applicationStart))
+      case applicationEnd: SparkListenerApplicationEnd =>
+        sparkListeners.foreach(_.onApplicationEnd(applicationEnd))
       case SparkListenerShutdown =>
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 25b7472..936e9db 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo)
+      sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
index 5554868..132502b 100644
--- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
@@ -23,6 +23,6 @@ import java.io.File
  * References a particular segment of a file (potentially the entire file),
  * based off an offset and a length.
  */
-private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) {
+private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) {
   override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index f53df7f..b8e6e15 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -34,23 +34,22 @@ private[spark] class SparkUI(
     val sc: SparkContext,
     conf: SparkConf,
     val listenerBus: SparkListenerBus,
-    val appName: String,
+    var appName: String,
     val basePath: String = "")
-  extends Logging {
+  extends WebUI("SparkUI") with Logging {
 
   def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
-  def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
-    this(null, conf, listenerBus, appName, basePath)
+  def this(listenerBus: SparkListenerBus, appName: String, basePath: String) =
+    this(null, new SparkConf, listenerBus, appName, basePath)
 
   // If SparkContext is not provided, assume the associated application is not live
   val live = sc != null
 
   val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
 
-  private val bindHost = Utils.localHostName()
-  private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
-  private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
-  private var serverInfo: Option[ServerInfo] = None
+  private val localHost = Utils.localHostName()
+  private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+  private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
 
   private val storage = new BlockManagerUI(this)
   private val jobs = new JobProgressUI(this)
@@ -77,20 +76,10 @@ private[spark] class SparkUI(
   // Maintain executor storage status through Spark events
   val storageStatusListener = new StorageStatusListener
 
-  /** Bind the HTTP server which backs this web interface */
-  def bind() {
-    try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
-      logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Spark JettyUtils", e)
-        System.exit(1)
-    }
+  def setAppName(name: String) {
+    appName = name
   }
 
-  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
   /** Initialize all components of the server */
   def start() {
     storage.start()
@@ -106,9 +95,21 @@ private[spark] class SparkUI(
     listenerBus.addListener(exec.listener)
   }
 
-  def stop() {
-    assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!")
-    serverInfo.get.server.stop()
+  /** Bind to the HTTP server behind this web interface. */
+  override def bind() {
+    try {
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
+      logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to create Spark web UI", e)
+        System.exit(1)
+    }
+  }
+
+  /** Stop the server behind this web interface. Only valid after bind(). */
+  override def stop() {
+    super.stop()
     logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
   }
 
@@ -117,6 +118,6 @@ private[spark] class SparkUI(
 }
 
 private[spark] object SparkUI {
-  val DEFAULT_PORT = "4040"
+  val DEFAULT_PORT = 4040
   val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index a7b872f..2cc7582 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -20,6 +20,25 @@ package org.apache.spark.ui
 import java.text.SimpleDateFormat
 import java.util.Date
 
+private[spark] abstract class WebUI(name: String) {
+  protected var serverInfo: Option[ServerInfo] = None
+
+  /**
+   * Bind to the HTTP server behind this web interface.
+   * Overridden implementation should set serverInfo.
+   */
+  def bind() { }
+
+  /** Return the actual port to which this server is bound. Only valid after bind(). */
+  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
+  /** Stop the server behind this web interface. Only valid after bind(). */
+  def stop() {
+    assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+    serverInfo.get.server.stop()
+  }
+}
+
 /**
  * Utilities used throughout the web UI.
  */
@@ -45,6 +64,6 @@ private[spark] object WebUI {
       return "%.0f min".format(minutes)
     }
     val hours = minutes / 60
-    return "%.1f h".format(hours)
+    "%.1f h".format(hours)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 23e90c3..33df971 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -29,10 +29,11 @@ import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.Page.Environment
 
 private[ui] class EnvironmentUI(parent: SparkUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private var _listener: Option[EnvironmentListener] = None
 
+  private def appName = parent.appName
+
   lazy val listener = _listener.get
 
   def start() {

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 031ed88..77a38a1 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -33,10 +33,11 @@ import org.apache.spark.ui.{SparkUI, UIUtils}
 import org.apache.spark.util.Utils
 
 private[ui] class ExecutorsUI(parent: SparkUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private var _listener: Option[ExecutorsListener] = None
 
+  private def appName = parent.appName
+
   lazy val listener = _listener.get
 
   def start() {

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 70d62b6..f811aff 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -27,13 +27,14 @@ import org.apache.spark.ui.UIUtils
 
 /** Page showing list of all ongoing and recently finished stages and pools */
 private[ui] class IndexPage(parent: JobProgressUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private val live = parent.live
   private val sc = parent.sc
   private lazy val listener = parent.listener
   private lazy val isFairScheduler = parent.isFairScheduler
 
+  private def appName = parent.appName
+
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val activeStages = listener.activeStages.values.toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index b2c6738..ad1a12c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -29,7 +29,6 @@ import org.apache.spark.util.Utils
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
 private[ui] class JobProgressUI(parent: SparkUI) {
-  val appName = parent.appName
   val basePath = parent.basePath
   val live = parent.live
   val sc = parent.sc
@@ -42,6 +41,8 @@ private[ui] class JobProgressUI(parent: SparkUI) {
   private val poolPage = new PoolPage(this)
   private var _listener: Option[JobProgressListener] = None
 
+  def appName = parent.appName
+
   def start() {
     val conf = if (live) sc.conf else new SparkConf
     _listener = Some(new JobProgressListener(conf))

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index bd33182..3638e60 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -27,12 +27,13 @@ import org.apache.spark.ui.UIUtils
 
 /** Page showing specific pool details */
 private[ui] class PoolPage(parent: JobProgressUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private val live = parent.live
   private val sc = parent.sc
   private lazy val listener = parent.listener
 
+  private def appName = parent.appName
+
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val poolName = request.getParameter("poolname")

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0c55f2e..0bcbd74 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: JobProgressUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private lazy val listener = parent.listener
 
+  private def appName = parent.appName
+
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val stageId = request.getParameter("id").toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a7b24ff..16996a2 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -30,7 +30,6 @@ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
 
 /** Web UI showing storage status of all RDD's in the given SparkContext. */
 private[ui] class BlockManagerUI(parent: SparkUI) {
-  val appName = parent.appName
   val basePath = parent.basePath
 
   private val indexPage = new IndexPage(this)
@@ -39,6 +38,8 @@ private[ui] class BlockManagerUI(parent: SparkUI) {
 
   lazy val listener = _listener.get
 
+  def appName = parent.appName
+
   def start() {
     _listener = Some(new BlockManagerListener(parent.storageStatusListener))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index 0fa461e..4f6acc3 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.Utils
 
 /** Page showing list of RDD's currently stored in the cluster */
 private[ui] class IndexPage(parent: BlockManagerUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private lazy val listener = parent.listener
 
+  private def appName = parent.appName
+
   def render(request: HttpServletRequest): Seq[Node] = {
 
     val rdds = listener.rddInfoList

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 3f42eba..75ee997 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.Utils
 
 /** Page showing storage details for a given RDD */
 private[ui] class RDDPage(parent: BlockManagerUI) {
-  private val appName = parent.appName
   private val basePath = parent.basePath
   private lazy val listener = parent.listener
 
+  private def appName = parent.appName
+
   def render(request: HttpServletRequest): Seq[Node] = {
     val rddId = request.getParameter("id").toInt
     val storageStatusList = listener.storageStatusList

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index b5f2ec6..0080a8b 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -49,7 +49,7 @@ private[spark] class FileLogger(
   }
 
   private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
-  private var fileIndex = 0
+  var fileIndex = 0
 
   // Only used if compression is enabled
   private lazy val compressionCodec = CompressionCodec.createCodec(conf)
@@ -57,10 +57,9 @@ private[spark] class FileLogger(
   // Only defined if the file system scheme is not local
   private var hadoopDataStream: Option[FSDataOutputStream] = None
 
-  private var writer: Option[PrintWriter] = {
-    createLogDir()
-    Some(createWriter())
-  }
+  private var writer: Option[PrintWriter] = None
+
+  createLogDir()
 
   /**
    * Create a logging directory with the given path.
@@ -84,8 +83,8 @@ private[spark] class FileLogger(
   /**
    * Create a new writer for the file identified by the given path.
    */
-  private def createWriter(): PrintWriter = {
-    val logPath = logDir + "/" + fileIndex
+  private def createWriter(fileName: String): PrintWriter = {
+    val logPath = logDir + "/" + fileName
     val uri = new URI(logPath)
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -147,13 +146,17 @@ private[spark] class FileLogger(
   }
 
   /**
-   * Start a writer for a new file if one does not already exit.
+   * Start a writer for a new file, closing the existing one if it exists.
+   * @param fileName Name of the new file, defaulting to the file index if not provided.
    */
-  def start() {
-    writer.getOrElse {
-      fileIndex += 1
-      writer = Some(createWriter())
+  def newFile(fileName: String = "") {
+    fileIndex += 1
+    writer.foreach(_.close())
+    val name = fileName match {
+      case "" => fileIndex.toString
+      case _ => fileName
     }
+    writer = Some(createWriter(name))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1965489..d990fd4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -62,6 +62,10 @@ private[spark] object JsonProtocol {
         blockManagerRemovedToJson(blockManagerRemoved)
       case unpersistRDD: SparkListenerUnpersistRDD =>
         unpersistRDDToJson(unpersistRDD)
+      case applicationStart: SparkListenerApplicationStart =>
+        applicationStartToJson(applicationStart)
+      case applicationEnd: SparkListenerApplicationEnd =>
+        applicationEndToJson(applicationEnd)
 
       // Not used, but keeps compiler happy
       case SparkListenerShutdown => JNothing
@@ -157,6 +161,18 @@ private[spark] object JsonProtocol {
     ("RDD ID" -> unpersistRDD.rddId)
   }
 
+  def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
+    ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+    ("App Name" -> applicationStart.appName) ~
+    ("Timestamp" -> applicationStart.time) ~
+    ("User" -> applicationStart.sparkUser)
+  }
+
+  def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
+    ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+    ("Timestamp" -> applicationEnd.time)
+  }
+
 
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
@@ -346,6 +362,8 @@ private[spark] object JsonProtocol {
     val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded)
     val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved)
     val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
+    val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
+    val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -359,6 +377,8 @@ private[spark] object JsonProtocol {
       case `blockManagerAdded` => blockManagerAddedFromJson(json)
       case `blockManagerRemoved` => blockManagerRemovedFromJson(json)
       case `unpersistRDD` => unpersistRDDFromJson(json)
+      case `applicationStart` => applicationStartFromJson(json)
+      case `applicationEnd` => applicationEndFromJson(json)
     }
   }
 
@@ -430,6 +450,17 @@ private[spark] object JsonProtocol {
     SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
   }
 
+  def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
+    val appName = (json \ "App Name").extract[String]
+    val time = (json \ "Timestamp").extract[Long]
+    val sparkUser = (json \ "User").extract[String]
+    SparkListenerApplicationStart(appName, time, sparkUser)
+  }
+
+  def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
+    SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+  }
+
 
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 59da51f..166f48c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
 import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.SortedSet
 import scala.io.Source
 import scala.reflect.ClassTag
 
@@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging {
   def getHadoopFileSystem(path: URI): FileSystem = {
     FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
   }
+
+  /**
+   * Return a Hadoop FileSystem with the scheme encoded in the given path.
+   */
+  def getHadoopFileSystem(path: String): FileSystem = {
+    getHadoopFileSystem(new URI(path))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79820fe8/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index beac656..8c06a2d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils