You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/12 08:34:04 UTC

[1/3] [SPARK-1386] Web UI for Spark Streaming

Repository: spark
Updated Branches:
  refs/heads/master 165e06a74 -> 6aa08c39c


http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
new file mode 100644
index 0000000..56429f6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import scala.collection.mutable
+
+import org.apache.spark.ui._
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
+
+/** Web UI showing storage status of all RDD's in the given SparkContext. */
+private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new StorageListener(parent.storageStatusListener)
+
+  attachPage(new StoragePage(this))
+  attachPage(new RddPage(this))
+  parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the BlockManagerUI
+ */
+private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
+  extends SparkListener {
+
+  private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
+
+  def storageStatusList = storageStatusListener.storageStatusList
+
+  /** Filter RDD info to include only those with cached partitions */
+  def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
+
+  /** Update each RDD's info to reflect any updates to the RDD's storage status */
+  private def updateRDDInfo() {
+    val rddInfos = _rddInfoMap.values.toSeq
+    val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
+    updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
+  }
+
+  /**
+   * Assumes the storage status list is fully up-to-date. This implies the corresponding
+   * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
+   */
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    val metrics = taskEnd.taskMetrics
+    if (metrics != null && metrics.updatedBlocks.isDefined) {
+      updateRDDInfo()
+    }
+  }
+
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
+    val rddInfo = stageSubmitted.stageInfo.rddInfo
+    _rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
+  }
+
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
+    // Remove all partitions that are no longer cached
+    _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
+  }
+
+  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
+    updateRDDInfo()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f2396f7..465835e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -88,30 +88,27 @@ private[spark] object JsonProtocol {
 
   def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
     val taskInfo = taskStart.taskInfo
-    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
     ("Event" -> Utils.getFormattedClassName(taskStart)) ~
     ("Stage ID" -> taskStart.stageId) ~
-    ("Task Info" -> taskInfoJson)
+    ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
   def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
     val taskInfo = taskGettingResult.taskInfo
-    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
     ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
-    ("Task Info" -> taskInfoJson)
+    ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
   def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
     val taskEndReason = taskEndReasonToJson(taskEnd.reason)
     val taskInfo = taskEnd.taskInfo
-    val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
     val taskMetrics = taskEnd.taskMetrics
     val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
     ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
     ("Stage ID" -> taskEnd.stageId) ~
     ("Task Type" -> taskEnd.taskType) ~
     ("Task End Reason" -> taskEndReason) ~
-    ("Task Info" -> taskInfoJson) ~
+    ("Task Info" -> taskInfoToJson(taskInfo)) ~
     ("Task Metrics" -> taskMetricsJson)
   }
 
@@ -505,6 +502,9 @@ private[spark] object JsonProtocol {
   }
 
   def taskMetricsFromJson(json: JValue): TaskMetrics = {
+    if (json == JNothing) {
+      return TaskMetrics.empty
+    }
     val metrics = new TaskMetrics
     metrics.hostname = (json \ "Host Name").extract[String]
     metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/test/scala/org/apache/spark/SparkUISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkUISuite.scala b/core/src/test/scala/org/apache/spark/SparkUISuite.scala
deleted file mode 100644
index d0d119c..0000000
--- a/core/src/test/scala/org/apache/spark/SparkUISuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-
-class SparkUISuite extends FunSuite with SharedSparkContext {
-
-  test("verify appUIAddress contains the scheme") {
-    val uiAddress = sc.ui.appUIAddress
-    assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
-  }
-
-  test("verify appUIAddress contains the port") {
-    val splitUIAddress = sc.ui.appUIAddress.split(':')
-    assert(splitUIAddress(2).toInt == sc.ui.boundPort)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2f9739f..b85c483 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,16 +18,81 @@
 package org.apache.spark.ui
 
 import java.net.ServerSocket
+import javax.servlet.http.HttpServletRequest
 
+import scala.io.Source
 import scala.util.{Failure, Success, Try}
 
 import org.eclipse.jetty.server.Server
 import org.eclipse.jetty.servlet.ServletContextHandler
 import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.LocalSparkContext._
+import scala.xml.Node
 
 class UISuite extends FunSuite {
+
+  test("basic ui visibility") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      // test if the ui is visible, and all the expected tabs are visible
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        val html = Source.fromURL(sc.ui.appUIAddress).mkString
+        assert(!html.contains("random data that should not be present"))
+        assert(html.toLowerCase.contains("stages"))
+        assert(html.toLowerCase.contains("storage"))
+        assert(html.toLowerCase.contains("environment"))
+        assert(html.toLowerCase.contains("executors"))
+      }
+    }
+  }
+
+  test("visibility at localhost:4040") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      // test if visible from http://localhost:4040
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        val html = Source.fromURL("http://localhost:4040").mkString
+        assert(html.toLowerCase.contains("stages"))
+      }
+    }
+  }
+
+  test("attaching a new tab") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val sparkUI = sc.ui
+
+      val newTab = new WebUITab(sparkUI, "foo") {
+        attachPage(new WebUIPage("") {
+          def render(request: HttpServletRequest): Seq[Node] = {
+            <b>"html magic"</b>
+          }
+        })
+      }
+      sparkUI.attachTab(newTab)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        val html = Source.fromURL(sc.ui.appUIAddress).mkString
+        assert(!html.contains("random data that should not be present"))
+
+        // check whether new page exists
+        assert(html.toLowerCase.contains("foo"))
+
+        // check whether other pages still exist
+        assert(html.toLowerCase.contains("stages"))
+        assert(html.toLowerCase.contains("storage"))
+        assert(html.toLowerCase.contains("environment"))
+        assert(html.toLowerCase.contains("executors"))
+      }
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
+        // check whether new page exists
+        assert(html.contains("magic"))
+      }
+    }
+  }
+
   test("jetty port increases under contention") {
     val startPort = 4040
     val server = new Server(startPort)
@@ -60,4 +125,18 @@ class UISuite extends FunSuite {
       case Failure(e) =>
     }
   }
+
+  test("verify appUIAddress contains the scheme") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val uiAddress = sc.ui.appUIAddress
+      assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
+    }
+  }
+
+  test("verify appUIAddress contains the port") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val splitUIAddress = sc.ui.appUIAddress.split(':')
+      assert(splitUIAddress(2).toInt == sc.ui.boundPort)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f75297a..16470bb 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -523,8 +523,8 @@ class JsonProtocolSuite extends FunSuite {
       700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
       {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
       [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status":
-      {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
-      "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
+      {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,
+      "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
     """
 
   private val jobStartJsonString =

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/project/MimaBuild.scala
----------------------------------------------------------------------
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 5ea4817..9cb31d7 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -60,6 +60,7 @@ object MimaBuild {
           Seq(
             excludePackage("org.apache.spark.api.java"),
             excludePackage("org.apache.spark.streaming.api.java"),
+            excludePackage("org.apache.spark.streaming.scheduler"),
             excludePackage("org.apache.spark.mllib")
           ) ++
           excludeSparkClass("rdd.ClassTags") ++
@@ -70,7 +71,12 @@ object MimaBuild {
           excludeSparkClass("mllib.regression.LassoWithSGD") ++
           excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
           excludeSparkClass("streaming.dstream.NetworkReceiver") ++
-          excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor")
+          excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++
+          excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
+          excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
+          excludeSparkClass("streaming.dstream.ReportError") ++
+          excludeSparkClass("streaming.dstream.ReportBlock") ++
+          excludeSparkClass("streaming.dstream.DStream")
         case _ => Seq()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a4e236c..ff5d0aa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,29 +17,28 @@
 
 package org.apache.spark.streaming
 
-import scala.collection.mutable.Queue
-import scala.collection.Map
-import scala.reflect.ClassTag
-
 import java.io.InputStream
 import java.util.concurrent.atomic.AtomicInteger
 
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
+import scala.collection.Map
+import scala.collection.mutable.Queue
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receivers._
 import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.util.MetadataCleaner
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -158,6 +157,8 @@ class StreamingContext private[streaming] (
 
   private[streaming] val waiter = new ContextWaiter
 
+  private[streaming] val uiTab = new StreamingTab(this)
+
   /** Enumeration to identify current state of the StreamingContext */
   private[streaming] object StreamingContextState extends Enumeration {
     type CheckpointState = Value

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200..a7e5215 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] (
     dependencies.foreach(_.clearMetadata(time))
   }
 
-  /* Adds metadata to the Stream while it is running.
-   * This method should be overwritten by sublcasses of InputDStream.
-   */
-  private[streaming] def addMetadata(metadata: Any) {
-    if (metadata != null) {
-      logInfo("Dropping Metadata: " + metadata.toString)
-    }
-  }
-
   /**
    * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
    * this stream. This is an internal method that should not be called directly. This is

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d19a635..5a24970 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -17,24 +17,23 @@
 
 package org.apache.spark.streaming.dstream
 
-import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
 import java.nio.ByteBuffer
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.Await
-import scala.concurrent.duration._
 import scala.reflect.ClassTag
 
-import akka.actor.{Props, Actor}
+import akka.actor.{Actor, Props}
 import akka.pattern.ask
 
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.streaming._
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{RDD, BlockRDD}
+import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
  * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils
 abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
   extends InputDStream[T](ssc_) {
 
-  // This is an unique identifier that is used to match the network receiver with the
-  // corresponding network input stream.
+  /** Keeps all received blocks information */
+  private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+  /** This is an unique identifier for the network input stream. */
   val id = ssc.getNewNetworkStreamId()
 
   /**
@@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
 
   def stop() {}
 
+  /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
   override def compute(validTime: Time): Option[RDD[T]] = {
     // If this is called for any time before the start time of the context,
     // then this returns an empty RDD. This may happen when recovering from a
     // master failure
     if (validTime >= graph.startTime) {
-      val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime)
+      val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
+      receivedBlockInfo(validTime) = blockInfo
+      val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
       Some(new BlockRDD[T](ssc.sc, blockIds))
     } else {
       Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
     }
   }
+
+  /** Get information on received blocks. */
+  private[streaming] def getReceivedBlockInfo(time: Time) = {
+    receivedBlockInfo(time)
+  }
+
+  /**
+   * Clear metadata that are older than `rememberDuration` of this DStream.
+   * This is an internal method that should not be called directly. This
+   * implementation overrides the default implementation to clear received
+   * block information.
+   */
+  private[streaming] override def clearMetadata(time: Time) {
+    super.clearMetadata(time)
+    val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+    receivedBlockInfo --= oldReceivedBlocks.keys
+    logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+      (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+  }
 }
 
 
 private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver() extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
-  extends NetworkReceiverMessage
-private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
 
 /**
  * Abstract class of a receiver that can be run on worker nodes to receive external data. See
@@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
           case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
         }.mkString("\n")
     }
+
     logInfo("Deregistering receiver " + streamId)
     val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
     Await.result(future, askTimeout)
@@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   /**
    * Push a block (as an ArrayBuffer filled with data) into the block manager.
    */
-  def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+  def pushBlock(
+      blockId: StreamBlockId,
+      arrayBuffer: ArrayBuffer[T],
+      metadata: Any,
+      level: StorageLevel
+    ) {
     env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
-    trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+    trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
     logDebug("Pushed block " + blockId)
   }
 
   /**
    * Push a block (as bytes) into the block manager.
    */
-  def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
+  def pushBlock(
+      blockId: StreamBlockId,
+      bytes: ByteBuffer,
+      metadata: Any,
+      level: StorageLevel
+    ) {
     env.blockManager.putBytes(blockId, bytes, level)
-    trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+    trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
   }
 
   /** Set the ID of the DStream that this receiver is associated with */
@@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   private class NetworkReceiverActor extends Actor {
 
     override def preStart() {
-      logInfo("Registered receiver " + streamId)
-      val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout)
+      val msg = RegisterReceiver(
+        streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
+      val future = trackerActor.ask(msg)(askTimeout)
       Await.result(future, askTimeout)
+      logInfo("Registered receiver " + streamId)
     }
 
     override def receive() = {
@@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   class BlockGenerator(storageLevel: StorageLevel)
     extends Serializable with Logging {
 
-    case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
+    case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
     val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 7f3cd2f..9c69a2a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
  */
 case class BatchInfo(
     batchTime: Time,
+    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
     submissionTime: Long,
     processingStartTime: Option[Long],
     processingEndTime: Option[Long]

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 92d885c..e564ecc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
       timesToReschedule.mkString(", "))
     timesToReschedule.foreach(time =>
-      jobScheduler.runJobs(time, graph.generateJobs(time))
+      jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
     )
 
     // Restart the timer
@@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     SparkEnv.set(ssc.env)
     Try(graph.generateJobs(time)) match {
       case Success(jobs) =>
-        jobScheduler.runJobs(time, jobs)
+        val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+          val streamId = stream.id
+          val receivedBlockInfo = stream.getReceivedBlockInfo(time)
+          (streamId, receivedBlockInfo)
+        }.toMap
+        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 04e0a6a..d9ada99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     logInfo("Stopped JobScheduler")
   }
 
-  def runJobs(time: Time, jobs: Seq[Job]) {
-    if (jobs.isEmpty) {
-      logInfo("No jobs added for time " + time)
+  def submitJobSet(jobSet: JobSet) {
+    if (jobSet.jobs.isEmpty) {
+      logInfo("No jobs added for time " + jobSet.time)
     } else {
-      val jobSet = new JobSet(time, jobs)
-      jobSets.put(time, jobSet)
+      jobSets.put(jobSet.time, jobSet)
       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
-      logInfo("Added jobs for time " + time)
+      logInfo("Added jobs for time " + jobSet.time)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index fcf303a..a69d743 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
   * belong to the same batch.
   */
 private[streaming]
-case class JobSet(time: Time, jobs: Seq[Job]) {
+case class JobSet(
+    time: Time,
+    jobs: Seq[Job],
+    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
+  ) {
 
   private val incompleteJobs = new HashSet[Job]()
   private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
   def toBatchInfo: BatchInfo = {
     new BatchInfo(
       time,
+      receivedBlockInfo,
       submissionTime,
       if (processingStartTime >= 0 ) Some(processingStartTime) else None,
       if (processingEndTime >= 0 ) Some(processingEndTime) else None

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 067e804..a1e6f51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -17,20 +17,42 @@
 
 package org.apache.spark.streaming.scheduler
 
-import scala.collection.mutable.{HashMap, Queue, SynchronizedMap}
+import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
 
 import akka.actor._
+
 import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.SparkContext._
-import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
 import org.apache.spark.util.AkkaUtils
 
+/** Information about receiver */
+case class ReceiverInfo(streamId: Int, typ: String, location: String) {
+  override def toString = s"$typ-$streamId"
+}
+
+/** Information about blocks received by the network receiver */
+case class ReceivedBlockInfo(
+    streamId: Int,
+    blockId: StreamBlockId,
+    numRecords: Long,
+    metadata: Any
+  )
+
+/**
+ * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * with each other.
+ */
 private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
-  extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+private[streaming] case class RegisterReceiver(
+    streamId: Int,
+    typ: String,
+    host: String,
+    receiverActor: ActorRef
+  ) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
   extends NetworkInputTrackerMessage
 private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
   extends NetworkInputTrackerMessage
@@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
   val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
   val receiverExecutor = new ReceiverExecutor()
   val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
-  val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]]
+  val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
+    with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
   val timeout = AkkaUtils.askTimeout(ssc.conf)
-
+  val listenerBus = ssc.scheduler.listenerBus
 
   // actor is created when generator starts.
   // This not being null means the tracker has been started and not stopped
@@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
     }
   }
 
+  /** Return all the blocks received from a receiver. */
+  def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
+    val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
+    logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
+    receivedBlockInfo.toArray
+  }
+
+  private def getReceivedBlockInfoQueue(streamId: Int) = {
+    receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
+  }
+
   /** Register a receiver */
-  def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) {
+  def registerReceiver(
+      streamId: Int,
+      typ: String,
+      host: String,
+      receiverActor: ActorRef,
+      sender: ActorRef
+    ) {
     if (!networkInputStreamMap.contains(streamId)) {
       throw new Exception("Register received for unexpected id " + streamId)
     }
     receiverInfo += ((streamId, receiverActor))
+    ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
+      ReceiverInfo(streamId, typ, host)
+    ))
     logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
   }
 
@@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
     logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
   }
 
-  /** Get all the received blocks for the given stream. */
-  def getBlocks(streamId: Int, time: Time): Array[BlockId] = {
-    val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]())
-    val result = queue.dequeueAll(x => true).toArray
-    logInfo("Stream " + streamId + " received " + result.size + " blocks")
-    result
-  }
-
   /** Add new blocks for the given stream */
-  def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = {
-    val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId])
-    queue ++= blockIds
-    networkInputStreamMap(streamId).addMetadata(metadata)
-    logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]"))
+  def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
+    getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
+    logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
+      receivedBlockInfo.blockId)
   }
 
   /** Check if any blocks are left to be processed */
   def hasMoreReceivedBlockIds: Boolean = {
-    !receivedBlockIds.forall(_._2.isEmpty)
+    !receivedBlockInfo.values.forall(_.isEmpty)
   }
 
   /** Actor to receive messages from the receivers. */
   private class NetworkInputTrackerActor extends Actor {
     def receive = {
-      case RegisterReceiver(streamId, receiverActor) =>
-        registerReceiver(streamId, receiverActor, sender)
+      case RegisterReceiver(streamId, typ, host, receiverActor) =>
+        registerReceiver(streamId, typ, host, receiverActor, sender)
         sender ! true
-      case AddBlocks(streamId, blockIds, metadata) =>
-        addBlocks(streamId, blockIds, metadata)
+      case AddBlock(receivedBlockInfo) =>
+        addBlocks(receivedBlockInfo)
       case DeregisterReceiver(streamId, message) =>
         deregisterReceiver(streamId, message)
         sender ! true

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 461ea35..5db40eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution
 /** Base trait for events related to StreamingListener */
 sealed trait StreamingListenerEvent
 
+case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
 case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
+  extends StreamingListenerEvent
 
 /** An event used in the listener to shutdown the listener daemon thread. */
 private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
  * computation.
  */
 trait StreamingListener {
-  /**
-   * Called when processing of a batch has completed
-   */
+
+  /** Called when a receiver has been started */
+  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
+
+  /** Called when processing of a batch of jobs has completed. */
   def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
 
-  /**
-   * Called when processing of a batch has started
-   */
+  /** Called when processing of a batch of jobs has started.  */
   def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 18811fc..ea03dfc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
       while (true) {
         val event = eventQueue.take
         event match {
+          case receiverStarted: StreamingListenerReceiverStarted =>
+            listeners.foreach(_.onReceiverStarted(receiverStarted))
+          case batchSubmitted: StreamingListenerBatchSubmitted =>
+            listeners.foreach(_.onBatchSubmitted(batchSubmitted))
           case batchStarted: StreamingListenerBatchStarted =>
             listeners.foreach(_.onBatchStarted(batchStarted))
           case batchCompleted: StreamingListenerBatchCompleted =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
new file mode 100644
index 0000000..8b025b0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.{Queue, HashMap}
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.ReceiverInfo
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
+import org.apache.spark.util.Distribution
+
+
+private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+
+  private val waitingBatchInfos = new HashMap[Time, BatchInfo]
+  private val runningBatchInfos = new HashMap[Time, BatchInfo]
+  private val completedaBatchInfos = new Queue[BatchInfo]
+  private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+  private var totalCompletedBatches = 0L
+  private val receiverInfos = new HashMap[Int, ReceiverInfo]
+
+  val batchDuration = ssc.graph.batchDuration.milliseconds
+
+  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+    synchronized {
+      receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+    }
+  }
+
+  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
+    runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+  }
+
+  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+    runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
+    waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+  }
+
+  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
+    waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+    runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+    completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+    if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+    totalCompletedBatches += 1L
+  }
+
+  def numNetworkReceivers = synchronized {
+    ssc.graph.getNetworkInputStreams().size
+  }
+
+  def numTotalCompletedBatches: Long = synchronized {
+    totalCompletedBatches
+  }
+
+  def numUnprocessedBatches: Long = synchronized {
+    waitingBatchInfos.size + runningBatchInfos.size
+  }
+
+  def waitingBatches: Seq[BatchInfo] = synchronized {
+    waitingBatchInfos.values.toSeq
+  }
+
+  def runningBatches: Seq[BatchInfo] = synchronized {
+    runningBatchInfos.values.toSeq
+  }
+
+  def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
+    completedaBatchInfos.toSeq
+  }
+
+  def processingDelayDistribution: Option[Distribution] = synchronized {
+    extractDistribution(_.processingDelay)
+  }
+
+  def schedulingDelayDistribution: Option[Distribution] = synchronized {
+    extractDistribution(_.schedulingDelay)
+  }
+
+  def totalDelayDistribution: Option[Distribution] = synchronized {
+    extractDistribution(_.totalDelay)
+  }
+
+  def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
+    val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
+    val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+    (0 until numNetworkReceivers).map { receiverId =>
+      val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
+        batchInfo.get(receiverId).getOrElse(Array.empty)
+      }
+      val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
+      // calculate records per second for each batch
+        blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+      }
+      val distributionOption = Distribution(recordsOfParticularReceiver)
+      (receiverId, distributionOption)
+    }.toMap
+  }
+
+  def lastReceivedBatchRecords: Map[Int, Long] = {
+    val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+    lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+      (0 until numNetworkReceivers).map { receiverId =>
+        (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+      }.toMap
+    }.getOrElse {
+      (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+    }
+  }
+
+  def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+    receiverInfos.get(receiverId)
+  }
+
+  def lastCompletedBatch: Option[BatchInfo] = {
+    completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+  }
+
+  def lastReceivedBatch: Option[BatchInfo] = {
+    retainedBatches.lastOption
+  }
+
+  private def retainedBatches: Seq[BatchInfo] = synchronized {
+    (waitingBatchInfos.values.toSeq ++
+      runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+  }
+
+  private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+    Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
new file mode 100644
index 0000000..6607437
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Distribution
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class StreamingPage(parent: StreamingTab)
+  extends WebUIPage("") with Logging {
+
+  private val listener = parent.listener
+  private val startTime = Calendar.getInstance().getTime()
+  private val emptyCell = "-"
+
+  /** Render the page */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val content =
+      generateBasicStats() ++ <br></br> ++
+      <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
+      generateNetworkStatsTable() ++
+      generateBatchStatsTable()
+    UIUtils.headerSparkPage(
+      content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
+  }
+
+  /** Generate basic stats of the streaming program */
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    <ul class ="unstyled">
+      <li>
+        <strong>Started at: </strong> {startTime.toString}
+      </li>
+      <li>
+        <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
+      </li>
+      <li>
+        <strong>Network receivers: </strong>{listener.numNetworkReceivers}
+      </li>
+      <li>
+        <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
+      </li>
+      <li>
+        <strong>Processed batches: </strong>{listener.numTotalCompletedBatches}
+      </li>
+      <li>
+        <strong>Waiting batches: </strong>{listener.numUnprocessedBatches}
+      </li>
+    </ul>
+  }
+
+  /** Generate stats of data received over the network the streaming program */
+  private def generateNetworkStatsTable(): Seq[Node] = {
+    val receivedRecordDistributions = listener.receivedRecordsDistributions
+    val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
+    val table = if (receivedRecordDistributions.size > 0) {
+      val headerRow = Seq(
+        "Receiver",
+        "Location",
+        "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
+        "Minimum rate\n[records/sec]",
+        "25th percentile rate\n[records/sec]",
+        "Median rate\n[records/sec]",
+        "75th percentile rate\n[records/sec]",
+        "Maximum rate\n[records/sec]"
+      )
+      val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+        val receiverInfo = listener.receiverInfo(receiverId)
+        val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+        val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+        val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+        val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
+          d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+        }.getOrElse {
+          Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
+        }
+        Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
+      }
+      Some(listingTable(headerRow, dataRows))
+    } else {
+      None
+    }
+
+    val content =
+      <h5>Network Input Statistics</h5> ++
+      <div>{table.getOrElse("No network receivers")}</div>
+
+    content
+  }
+
+  /** Generate stats of batch jobs of the streaming program */
+  private def generateBatchStatsTable(): Seq[Node] = {
+    val numBatches = listener.retainedCompletedBatches.size
+    val lastCompletedBatch = listener.lastCompletedBatch
+    val table = if (numBatches > 0) {
+      val processingDelayQuantilesRow = {
+        Seq(
+          "Processing Time",
+          formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
+        ) ++ getQuantiles(listener.processingDelayDistribution)
+      }
+      val schedulingDelayQuantilesRow = {
+        Seq(
+          "Scheduling Delay",
+          formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
+        ) ++ getQuantiles(listener.schedulingDelayDistribution)
+      }
+      val totalDelayQuantilesRow = {
+        Seq(
+          "Total Delay",
+          formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
+        ) ++ getQuantiles(listener.totalDelayDistribution)
+      }
+      val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
+        "Median", "75th percentile", "Maximum")
+      val dataRows: Seq[Seq[String]] = Seq(
+        processingDelayQuantilesRow,
+        schedulingDelayQuantilesRow,
+        totalDelayQuantilesRow
+      )
+      Some(listingTable(headerRow, dataRows))
+    } else {
+      None
+    }
+
+    val content =
+      <h5>Batch Processing Statistics</h5> ++
+      <div>
+        <ul class="unstyled">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </ul>
+      </div>
+
+    content
+  }
+
+
+  /**
+   * Returns a human-readable string representing a duration such as "5 second 35 ms"
+   */
+  private def formatDurationOption(msOption: Option[Long]): String = {
+    msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+  }
+
+  /** Get quantiles for any time distribution */
+  private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
+    timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+  }
+
+  /** Generate HTML table from string data */
+  private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+    def generateDataRow(data: Seq[String]): Seq[Node] = {
+      <tr> {data.map(d => <td>{d}</td>)} </tr>
+    }
+    UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
new file mode 100644
index 0000000..51448d1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.ui.WebUITab
+
+/** Spark Web UI tab that shows statistics of a streaming job */
+private[spark] class StreamingTab(ssc: StreamingContext)
+  extends WebUITab(ssc.sc.ui, "streaming") with Logging {
+
+  val parent = ssc.sc.ui
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new StreamingJobProgressListener(ssc)
+
+  ssc.addStreamingListener(listener)
+  attachPage(new StreamingPage(this))
+  parent.attachTab(this)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d..952511d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
 
 /** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
 
   val queue = new ArrayBlockingQueue[String](100)
 
-  val serverSocket = new ServerSocket(0)
+  val serverSocket = new ServerSocket(portToBind)
 
   val servingThread = new Thread() {
     override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
 
   def start() { servingThread.start() }
 
-  def send(msg: String) { queue.add(msg) }
+  def send(msg: String) { queue.put(msg) }
 
   def stop() { servingThread.interrupt() }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef..efd0d22 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     }
   }
 
-
   test("stop only streaming context") {
     ssc = new StreamingContext(master, appName, batchDuration)
     sc = ssc.sparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000..35538ec
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+  test("streaming tab in spark UI") {
+    val ssc = new StreamingContext("local", "test", Seconds(1))
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+      assert(!html.contains("random data that should not be present"))
+      // test if streaming tab exist
+      assert(html.toLowerCase.contains("streaming"))
+      // test if other Spark tabs still exist
+      assert(html.toLowerCase.contains("stages"))
+    }
+
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      val html = Source.fromURL(
+        ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+      assert(html.toLowerCase.contains("batch"))
+      assert(html.toLowerCase.contains("network"))
+    }
+  }
+}


[2/3] [SPARK-1386] Web UI for Spark Streaming

Posted by pw...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a7cf04b..6a2d652 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,16 +17,115 @@
 
 package org.apache.spark.ui
 
+import java.text.SimpleDateFormat
+import java.util.{Locale, Date}
+
 import scala.xml.Node
+import org.apache.spark.Logging
 
 /** Utility functions for generating XML pages with spark content. */
-private[spark] object UIUtils {
+private[spark] object UIUtils extends Logging {
+
+  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  }
+
+  def formatDate(date: Date): String = dateFormat.get.format(date)
+
+  def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+  def formatDuration(milliseconds: Long): String = {
+    val seconds = milliseconds.toDouble / 1000
+    if (seconds < 60) {
+      return "%.0f s".format(seconds)
+    }
+    val minutes = seconds / 60
+    if (minutes < 10) {
+      return "%.1f min".format(minutes)
+    } else if (minutes < 60) {
+      return "%.0f min".format(minutes)
+    }
+    val hours = minutes / 60
+    "%.1f h".format(hours)
+  }
+
+  /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
+  def formatDurationVerbose(ms: Long): String = {
+    try {
+      val second = 1000L
+      val minute = 60 * second
+      val hour = 60 * minute
+      val day = 24 * hour
+      val week = 7 * day
+      val year = 365 * day
+
+      def toString(num: Long, unit: String): String = {
+        if (num == 0) {
+          ""
+        } else if (num == 1) {
+          s"$num $unit"
+        } else {
+          s"$num ${unit}s"
+        }
+      }
+
+      val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
+      val secondString = toString((ms % minute) / second, "second")
+      val minuteString = toString((ms % hour) / minute, "minute")
+      val hourString = toString((ms % day) / hour, "hour")
+      val dayString = toString((ms % week) / day, "day")
+      val weekString = toString((ms % year) / week, "week")
+      val yearString = toString(ms / year, "year")
 
-  import Page._
+      Seq(
+        second -> millisecondsString,
+        minute -> s"$secondString $millisecondsString",
+        hour -> s"$minuteString $secondString",
+        day -> s"$hourString $minuteString $secondString",
+        week -> s"$dayString $hourString $minuteString",
+        year -> s"$weekString $dayString $hourString"
+      ).foreach { case (durationLimit, durationString) =>
+        if (ms < durationLimit) {
+          // if time is less than the limit (upto year)
+          return durationString
+        }
+      }
+      // if time is more than a year
+      return s"$yearString $weekString $dayString"
+    } catch {
+      case e: Exception =>
+        logError("Error converting time to string", e)
+        // if there is some error, return blank string
+        return ""
+    }
+  }
+
+  /** Generate a human-readable string representing a number (e.g. 100 K) */
+  def formatNumber(records: Double): String = {
+    val trillion = 1e12
+    val billion = 1e9
+    val million = 1e6
+    val thousand = 1e3
+
+    val (value, unit) = {
+      if (records >= 2*trillion) {
+        (records / trillion, " T")
+      } else if (records >= 2*billion) {
+        (records / billion, " B")
+      } else if (records >= 2*million) {
+        (records / million, " M")
+      } else if (records >= 2*thousand) {
+        (records / thousand, " K")
+      } else {
+        (records, "")
+      }
+    }
+    "%.1f%s".formatLocal(Locale.US, value, unit)
+  }
 
   // Yarn has to go through a proxy so the base uri is provided and has to be on all links
-  private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
-    getOrElse("")
+  val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
 
   def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
 
@@ -36,26 +135,14 @@ private[spark] object UIUtils {
       basePath: String,
       appName: String,
       title: String,
-      page: Page.Value) : Seq[Node] = {
-    val jobs = page match {
-      case Stages =>
-        <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
-    }
-    val storage = page match {
-      case Storage =>
-        <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
-    }
-    val environment = page match {
-      case Environment =>
-        <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
-    }
-    val executors = page match {
-      case Executors =>
-        <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+      tabs: Seq[WebUITab],
+      activeTab: WebUITab,
+      refreshInterval: Option[Int] = None): Seq[Node] = {
+
+    val header = tabs.map { tab =>
+      <li class={if (tab == activeTab) "active" else ""}>
+        <a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
+      </li>
     }
 
     <html>
@@ -74,16 +161,10 @@ private[spark] object UIUtils {
             <a href={prependBaseUri(basePath, "/")} class="brand">
               <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
             </a>
-            <ul class="nav">
-              {jobs}
-              {storage}
-              {environment}
-              {executors}
-            </ul>
+            <ul class="nav">{header}</ul>
             <p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
           </div>
         </div>
-
         <div class="container-fluid">
           <div class="row-fluid">
             <div class="span12">
@@ -129,21 +210,36 @@ private[spark] object UIUtils {
   /** Returns an HTML table constructed by generating a row for each object in a sequence. */
   def listingTable[T](
       headers: Seq[String],
-      makeRow: T => Seq[Node],
-      rows: Seq[T],
+      generateDataRow: T => Seq[Node],
+      data: Seq[T],
       fixedWidth: Boolean = false): Seq[Node] = {
 
-    val colWidth = 100.toDouble / headers.size
-    val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
     var tableClass = "table table-bordered table-striped table-condensed sortable"
     if (fixedWidth) {
       tableClass += " table-fixed"
     }
-
+    val colWidth = 100.toDouble / headers.size
+    val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+    val headerRow: Seq[Node] = {
+      // if none of the headers have "\n" in them
+      if (headers.forall(!_.contains("\n"))) {
+        // represent header as simple text
+        headers.map(h => <th width={colWidthAttr}>{h}</th>)
+      } else {
+        // represent header text as list while respecting "\n"
+        headers.map { case h =>
+          <th width={colWidthAttr}>
+            <ul class ="unstyled">
+              { h.split("\n").map { case t => <li> {t} </li> } }
+            </ul>
+          </th>
+        }
+      }
+    }
     <table class={tableClass}>
-      <thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
+      <thead>{headerRow}</thead>
       <tbody>
-        {rows.map(r => makeRow(r))}
+        {data.map(r => generateDataRow(r))}
       </tbody>
     </table>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 2cc7582..b08f308 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -17,53 +17,134 @@
 
 package org.apache.spark.ui
 
-import java.text.SimpleDateFormat
-import java.util.Date
+import javax.servlet.http.HttpServletRequest
 
-private[spark] abstract class WebUI(name: String) {
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.Node
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.json4s.JsonAST.{JNothing, JValue}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * The top level component of the UI hierarchy that contains the server.
+ *
+ * Each WebUI represents a collection of tabs, each of which in turn represents a collection of
+ * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
+ */
+private[spark] abstract class WebUI(
+    securityManager: SecurityManager,
+    port: Int,
+    conf: SparkConf,
+    basePath: String = "")
+  extends Logging {
+
+  protected val tabs = ArrayBuffer[WebUITab]()
+  protected val handlers = ArrayBuffer[ServletContextHandler]()
   protected var serverInfo: Option[ServerInfo] = None
+  protected val localHostName = Utils.localHostName()
+  protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+  private val className = Utils.getFormattedClassName(this)
+
+  def getTabs: Seq[WebUITab] = tabs.toSeq
+  def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+
+  /** Attach a tab to this UI, along with all of its attached pages. */
+  def attachTab(tab: WebUITab) {
+    tab.pages.foreach(attachPage)
+    tabs += tab
+  }
+
+  /** Attach a page to this UI. */
+  def attachPage(page: WebUIPage) {
+    val pagePath = "/" + page.prefix
+    attachHandler(createServletHandler(pagePath,
+      (request: HttpServletRequest) => page.render(request), securityManager, basePath))
+    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
+      (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+  }
+
+  /** Attach a handler to this UI. */
+  def attachHandler(handler: ServletContextHandler) {
+    handlers += handler
+    serverInfo.foreach { info =>
+      info.rootHandler.addHandler(handler)
+      if (!handler.isStarted) {
+        handler.start()
+      }
+    }
+  }
 
-  /**
-   * Bind to the HTTP server behind this web interface.
-   * Overridden implementation should set serverInfo.
-   */
-  def bind() { }
+  /** Detach a handler from this UI. */
+  def detachHandler(handler: ServletContextHandler) {
+    handlers -= handler
+    serverInfo.foreach { info =>
+      info.rootHandler.removeHandler(handler)
+      if (handler.isStarted) {
+        handler.stop()
+      }
+    }
+  }
+
+  /** Initialize all components of the server. */
+  def initialize()
+
+  /** Bind to the HTTP server behind this web interface. */
+  def bind() {
+    assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
+    try {
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+      logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to bind %s".format(className), e)
+        System.exit(1)
+    }
+  }
 
   /** Return the actual port to which this server is bound. Only valid after bind(). */
   def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
 
   /** Stop the server behind this web interface. Only valid after bind(). */
   def stop() {
-    assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+    assert(serverInfo.isDefined,
+      "Attempted to stop %s before binding to a server!".format(className))
     serverInfo.get.server.stop()
   }
 }
 
+
 /**
- * Utilities used throughout the web UI.
+ * A tab that represents a collection of pages.
+ * The prefix is appended to the parent address to form a full path, and must not contain slashes.
  */
-private[spark] object WebUI {
-  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
-  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
+  val pages = ArrayBuffer[WebUIPage]()
+  val name = prefix.capitalize
+
+  /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
+  def attachPage(page: WebUIPage) {
+    page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
+    pages += page
   }
 
-  def formatDate(date: Date): String = dateFormat.get.format(date)
+  /** Get a list of header tabs from the parent UI. */
+  def headerTabs: Seq[WebUITab] = parent.getTabs
+}
 
-  def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
 
-  def formatDuration(milliseconds: Long): String = {
-    val seconds = milliseconds.toDouble / 1000
-    if (seconds < 60) {
-      return "%.0f s".format(seconds)
-    }
-    val minutes = seconds / 60
-    if (minutes < 10) {
-      return "%.1f min".format(minutes)
-    } else if (minutes < 60) {
-      return "%.0f min".format(minutes)
-    }
-    val hours = minutes / 60
-    "%.1f h".format(hours)
-  }
+/**
+ * A page that represents the leaf node in the UI hierarchy.
+ *
+ * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
+ * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
+ * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
+ * to form a relative path. The prefix must not contain slashes.
+ */
+private[spark] abstract class WebUIPage(var prefix: String) {
+  def render(request: HttpServletRequest): Seq[Node]
+  def renderJson(request: HttpServletRequest): JValue = JNothing
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
new file mode 100644
index 0000000..b347eb1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val runtimeInformationTable = UIUtils.listingTable(
+      propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
+    val sparkPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
+    val systemPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
+    val classpathEntriesTable = UIUtils.listingTable(
+      classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
+    val content =
+      <span>
+        <h4>Runtime Information</h4> {runtimeInformationTable}
+        <h4>Spark Properties</h4> {sparkPropertiesTable}
+        <h4>System Properties</h4> {systemPropertiesTable}
+        <h4>Classpath Entries</h4> {classpathEntriesTable}
+      </span>
+
+    UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
+  }
+
+  private def propertyHeader = Seq("Name", "Value")
+  private def classPathHeaders = Seq("Resource", "Source")
+  private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
new file mode 100644
index 0000000..03b46e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
+
+private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new EnvironmentListener
+
+  attachPage(new EnvironmentPage(this))
+  parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentTab
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+  var jvmInformation = Seq[(String, String)]()
+  var sparkProperties = Seq[(String, String)]()
+  var systemProperties = Seq[(String, String)]()
+  var classpathEntries = Seq[(String, String)]()
+
+  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+    synchronized {
+      val environmentDetails = environmentUpdate.environmentDetails
+      jvmInformation = environmentDetails("JVM Information")
+      sparkProperties = environmentDetails("Spark Properties")
+      systemProperties = environmentDetails("System Properties")
+      classpathEntries = environmentDetails("Classpath Entries")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
deleted file mode 100644
index 33df971..0000000
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.env
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.scheduler._
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Environment
-
-private[ui] class EnvironmentUI(parent: SparkUI) {
-  private val basePath = parent.basePath
-  private var _listener: Option[EnvironmentListener] = None
-
-  private def appName = parent.appName
-
-  lazy val listener = _listener.get
-
-  def start() {
-    _listener = Some(new EnvironmentListener)
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/environment",
-      (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
-  )
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val runtimeInformationTable = UIUtils.listingTable(
-      propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
-    val sparkPropertiesTable = UIUtils.listingTable(
-      propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
-    val systemPropertiesTable = UIUtils.listingTable(
-      propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
-    val classpathEntriesTable = UIUtils.listingTable(
-      classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
-    val content =
-      <span>
-        <h4>Runtime Information</h4> {runtimeInformationTable}
-        <h4>Spark Properties</h4> {sparkPropertiesTable}
-        <h4>System Properties</h4> {systemPropertiesTable}
-        <h4>Classpath Entries</h4> {classpathEntriesTable}
-      </span>
-
-    UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
-  }
-
-  private def propertyHeader = Seq("Name", "Value")
-  private def classPathHeaders = Seq("Resource", "Source")
-  private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-  private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-  private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the EnvironmentUI
- */
-private[ui] class EnvironmentListener extends SparkListener {
-  var jvmInformation = Seq[(String, String)]()
-  var sparkProperties = Seq[(String, String)]()
-  var systemProperties = Seq[(String, String)]()
-  var classpathEntries = Seq[(String, String)]()
-
-  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
-    synchronized {
-      val environmentDetails = environmentUpdate.environmentDetails
-      jvmInformation = environmentDetails("JVM Information")
-      sparkProperties = environmentDetails("Spark Properties")
-      systemProperties = environmentDetails("System Properties")
-      classpathEntries = environmentDetails("Classpath Entries")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
new file mode 100644
index 0000000..c1e69f6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val storageStatusList = listener.storageStatusList
+    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
+    val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
+    val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
+    val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
+
+    val content =
+      <div class="row-fluid">
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>Memory:</strong>
+              {Utils.bytesToString(memUsed)} Used
+              ({Utils.bytesToString(maxMem)} Total) </li>
+            <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
+          </ul>
+        </div>
+      </div>
+      <div class = "row">
+        <div class="span12">
+          {execTable}
+        </div>
+      </div>;
+
+    UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
+      parent.headerTabs, parent)
+  }
+
+  /** Header fields for the executors table */
+  private def execHeader = Seq(
+    "Executor ID",
+    "Address",
+    "RDD Blocks",
+    "Memory Used",
+    "Disk Used",
+    "Active Tasks",
+    "Failed Tasks",
+    "Complete Tasks",
+    "Total Tasks",
+    "Task Time",
+    "Shuffle Read",
+    "Shuffle Write")
+
+  /** Render an HTML row representing an executor */
+  private def execRow(values: Map[String, String]): Seq[Node] = {
+    val maximumMemory = values("Maximum Memory")
+    val memoryUsed = values("Memory Used")
+    val diskUsed = values("Disk Used")
+    <tr>
+      <td>{values("Executor ID")}</td>
+      <td>{values("Address")}</td>
+      <td>{values("RDD Blocks")}</td>
+      <td sorttable_customkey={memoryUsed}>
+        {Utils.bytesToString(memoryUsed.toLong)} /
+        {Utils.bytesToString(maximumMemory.toLong)}
+      </td>
+      <td sorttable_customkey={diskUsed}>
+        {Utils.bytesToString(diskUsed.toLong)}
+      </td>
+      <td>{values("Active Tasks")}</td>
+      <td>{values("Failed Tasks")}</td>
+      <td>{values("Complete Tasks")}</td>
+      <td>{values("Total Tasks")}</td>
+      <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+      <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
+      <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
+    </tr>
+  }
+
+  /** Represent an executor's info as a map given a storage status index */
+  private def getExecInfo(statusId: Int): Map[String, String] = {
+    val status = listener.storageStatusList(statusId)
+    val execId = status.blockManagerId.executorId
+    val hostPort = status.blockManagerId.hostPort
+    val rddBlocks = status.blocks.size
+    val memUsed = status.memUsed()
+    val maxMem = status.maxMem
+    val diskUsed = status.diskUsed()
+    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
+    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
+    val totalTasks = activeTasks + failedTasks + completedTasks
+    val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
+
+    // Also include fields not in the header
+    val execFields = execHeader ++ Seq("Maximum Memory")
+
+    val execValues = Seq(
+      execId,
+      hostPort,
+      rddBlocks,
+      memUsed,
+      diskUsed,
+      activeTasks,
+      failedTasks,
+      completedTasks,
+      totalTasks,
+      totalDuration,
+      totalShuffleRead,
+      totalShuffleWrite,
+      maxMem
+    ).map(_.toString)
+
+    execFields.zip(execValues).toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
new file mode 100644
index 0000000..5678bf3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new ExecutorsListener(parent.storageStatusListener)
+
+  attachPage(new ExecutorsPage(this))
+  parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the ExecutorsTab
+ */
+private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+  extends SparkListener {
+
+  val executorToTasksActive = HashMap[String, Int]()
+  val executorToTasksComplete = HashMap[String, Int]()
+  val executorToTasksFailed = HashMap[String, Int]()
+  val executorToDuration = HashMap[String, Long]()
+  val executorToShuffleRead = HashMap[String, Long]()
+  val executorToShuffleWrite = HashMap[String, Long]()
+
+  def storageStatusList = storageStatusListener.storageStatusList
+
+  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+    val eid = formatExecutorId(taskStart.taskInfo.executorId)
+    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    val info = taskEnd.taskInfo
+    if (info != null) {
+      val eid = formatExecutorId(info.executorId)
+      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
+      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+      taskEnd.reason match {
+        case e: ExceptionFailure =>
+          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+        case _ =>
+          executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+      }
+
+      // Update shuffle read/write
+      val metrics = taskEnd.taskMetrics
+      if (metrics != null) {
+        metrics.shuffleReadMetrics.foreach { shuffleRead =>
+          executorToShuffleRead(eid) =
+            executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
+        }
+        metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+          executorToShuffleWrite(eid) =
+            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+        }
+      }
+    }
+  }
+
+  // This addresses executor ID inconsistencies in the local mode
+  private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
deleted file mode 100644
index 77a38a1..0000000
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.exec
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable.HashMap
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Executors
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.Utils
-
-private[ui] class ExecutorsUI(parent: SparkUI) {
-  private val basePath = parent.basePath
-  private var _listener: Option[ExecutorsListener] = None
-
-  private def appName = parent.appName
-
-  lazy val listener = _listener.get
-
-  def start() {
-    _listener = Some(new ExecutorsListener(parent.storageStatusListener))
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/executors",
-      (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
-  )
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val storageStatusList = listener.storageStatusList
-    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
-    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
-    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
-    val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
-    val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
-    val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
-
-    val content =
-      <div class="row-fluid">
-        <div class="span12">
-          <ul class="unstyled">
-            <li><strong>Memory:</strong>
-              {Utils.bytesToString(memUsed)} Used
-              ({Utils.bytesToString(maxMem)} Total) </li>
-            <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
-          </ul>
-        </div>
-      </div>
-      <div class = "row">
-        <div class="span12">
-          {execTable}
-        </div>
-      </div>;
-
-    UIUtils.headerSparkPage(
-      content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
-  }
-
-  /** Header fields for the executors table */
-  private def execHeader = Seq(
-    "Executor ID",
-    "Address",
-    "RDD Blocks",
-    "Memory Used",
-    "Disk Used",
-    "Active Tasks",
-    "Failed Tasks",
-    "Complete Tasks",
-    "Total Tasks",
-    "Task Time",
-    "Shuffle Read",
-    "Shuffle Write")
-
-  /** Render an HTML row representing an executor */
-  private def execRow(values: Map[String, String]): Seq[Node] = {
-    val maximumMemory = values("Maximum Memory")
-    val memoryUsed = values("Memory Used")
-    val diskUsed = values("Disk Used")
-    <tr>
-      <td>{values("Executor ID")}</td>
-      <td>{values("Address")}</td>
-      <td>{values("RDD Blocks")}</td>
-      <td sorttable_customkey={memoryUsed}>
-        {Utils.bytesToString(memoryUsed.toLong)} /
-        {Utils.bytesToString(maximumMemory.toLong)}
-      </td>
-      <td sorttable_customkey={diskUsed}>
-        {Utils.bytesToString(diskUsed.toLong)}
-      </td>
-      <td>{values("Active Tasks")}</td>
-      <td>{values("Failed Tasks")}</td>
-      <td>{values("Complete Tasks")}</td>
-      <td>{values("Total Tasks")}</td>
-      <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
-      <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
-      <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
-    </tr>
-  }
-
-  /** Represent an executor's info as a map given a storage status index */
-  private def getExecInfo(statusId: Int): Map[String, String] = {
-    val status = listener.storageStatusList(statusId)
-    val execId = status.blockManagerId.executorId
-    val hostPort = status.blockManagerId.hostPort
-    val rddBlocks = status.blocks.size
-    val memUsed = status.memUsed()
-    val maxMem = status.maxMem
-    val diskUsed = status.diskUsed()
-    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
-    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
-    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
-    val totalTasks = activeTasks + failedTasks + completedTasks
-    val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
-    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
-    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
-
-    // Also include fields not in the header
-    val execFields = execHeader ++ Seq("Maximum Memory")
-
-    val execValues = Seq(
-      execId,
-      hostPort,
-      rddBlocks,
-      memUsed,
-      diskUsed,
-      activeTasks,
-      failedTasks,
-      completedTasks,
-      totalTasks,
-      totalDuration,
-      totalShuffleRead,
-      totalShuffleWrite,
-      maxMem
-    ).map(_.toString)
-
-    execFields.zip(execValues).toMap
-  }
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the ExecutorsUI
- */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
-  val executorToTasksActive = HashMap[String, Int]()
-  val executorToTasksComplete = HashMap[String, Int]()
-  val executorToTasksFailed = HashMap[String, Int]()
-  val executorToDuration = HashMap[String, Long]()
-  val executorToShuffleRead = HashMap[String, Long]()
-  val executorToShuffleWrite = HashMap[String, Long]()
-
-  def storageStatusList = storageStatusListener.storageStatusList
-
-  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val eid = formatExecutorId(taskStart.taskInfo.executorId)
-    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val info = taskEnd.taskInfo
-    if (info != null) {
-      val eid = formatExecutorId(info.executorId)
-      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
-      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
-      taskEnd.reason match {
-        case e: ExceptionFailure =>
-          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
-        case _ =>
-          executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
-      }
-
-      // Update shuffle read/write
-      val metrics = taskEnd.taskMetrics
-      if (metrics != null) {
-        metrics.shuffleReadMetrics.foreach { shuffleRead =>
-          executorToShuffleRead(eid) =
-            executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
-        }
-        metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
-          executorToShuffleWrite(eid) =
-            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
-        }
-      }
-    }
-  }
-
-  // This addresses executor ID inconsistencies in the local mode
-  private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 73861ae..c83e196 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
 import scala.collection.mutable
 import scala.xml.Node
 
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
 /** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
-  private lazy val listener = parent.listener
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+  private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {
@@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
           <tr>
             <td>{k}</td>
             <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
-            <td>{parent.formatDuration(v.taskTime)}</td>
+            <td>{UIUtils.formatDuration(v.taskTime)}</td>
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
deleted file mode 100644
index 8619a31..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.spark.scheduler.Schedulable
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-
-/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class IndexPage(parent: JobProgressUI) {
-  private val basePath = parent.basePath
-  private val live = parent.live
-  private val sc = parent.sc
-  private lazy val listener = parent.listener
-  private lazy val isFairScheduler = parent.isFairScheduler
-
-  private def appName = parent.appName
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val activeStages = listener.activeStages.values.toSeq
-      val completedStages = listener.completedStages.reverse.toSeq
-      val failedStages = listener.failedStages.reverse.toSeq
-      val now = System.currentTimeMillis()
-
-      val activeStagesTable =
-        new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
-      val completedStagesTable =
-        new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
-      val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
-
-      // For now, pool information is only accessible in live UIs
-      val pools = if (live) sc.getAllPools else Seq[Schedulable]()
-      val poolTable = new PoolTable(pools, parent)
-
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            {if (live) {
-              // Total duration is not meaningful unless the UI is live
-              <li>
-                <strong>Total Duration: </strong>
-                {parent.formatDuration(now - sc.startTime)}
-              </li>
-            }}
-            <li>
-              <strong>Scheduling Mode: </strong>
-              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
-            </li>
-            <li>
-              <a href="#active"><strong>Active Stages:</strong></a>
-              {activeStages.size}
-            </li>
-            <li>
-              <a href="#completed"><strong>Completed Stages:</strong></a>
-              {completedStages.size}
-            </li>
-             <li>
-             <a href="#failed"><strong>Failed Stages:</strong></a>
-              {failedStages.size}
-            </li>
-          </ul>
-        </div>
-
-      val content = summary ++
-        {if (live && isFairScheduler) {
-          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
-        } else {
-          Seq[Node]()
-        }} ++
-        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
-        activeStagesTable.toNodeSeq ++
-        <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
-        completedStagesTable.toNodeSeq ++
-        <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
-        failedStagesTable.toNodeSeq
-
-      UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5167e20..0db4afa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
 
   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
     synchronized {
-      val schedulingModeName =
-        environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode")
-      schedulingMode = schedulingModeName match {
-        case Some(name) => Some(SchedulingMode.withName(name))
-        case None => None
-      }
+      schedulingMode = environmentUpdate
+        .environmentDetails("Spark Properties").toMap
+        .get("spark.scheduler.mode")
+        .map(SchedulingMode.withName)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
new file mode 100644
index 0000000..34ff2ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+
+/** Page showing list of all ongoing and recently finished stages and pools */
+private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val live = parent.live
+  private val sc = parent.sc
+  private val listener = parent.listener
+  private lazy val isFairScheduler = parent.isFairScheduler
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val activeStages = listener.activeStages.values.toSeq
+      val completedStages = listener.completedStages.reverse.toSeq
+      val failedStages = listener.failedStages.reverse.toSeq
+      val now = System.currentTimeMillis
+
+      val activeStagesTable =
+        new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
+      val completedStagesTable =
+        new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+      val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+
+      // For now, pool information is only accessible in live UIs
+      val pools = if (live) sc.getAllPools else Seq[Schedulable]()
+      val poolTable = new PoolTable(pools, parent)
+
+      val summary: NodeSeq =
+        <div>
+          <ul class="unstyled">
+            {if (live) {
+              // Total duration is not meaningful unless the UI is live
+              <li>
+                <strong>Total Duration: </strong>
+                {UIUtils.formatDuration(now - sc.startTime)}
+              </li>
+            }}
+            <li>
+              <strong>Scheduling Mode: </strong>
+              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+            </li>
+            <li>
+              <a href="#active"><strong>Active Stages:</strong></a>
+              {activeStages.size}
+            </li>
+            <li>
+              <a href="#completed"><strong>Completed Stages:</strong></a>
+              {completedStages.size}
+            </li>
+             <li>
+             <a href="#failed"><strong>Failed Stages:</strong></a>
+              {failedStages.size}
+            </li>
+          </ul>
+        </div>
+
+      val content = summary ++
+        {if (live && isFairScheduler) {
+          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
+        } else {
+          Seq[Node]()
+        }} ++
+        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+        activeStagesTable.toNodeSeq ++
+        <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+        completedStagesTable.toNodeSeq ++
+        <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+        failedStagesTable.toNodeSeq
+
+      UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
new file mode 100644
index 0000000..3308c8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val live = parent.live
+  val sc = parent.sc
+  val conf = if (live) sc.conf else new SparkConf
+  val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
+  val listener = new JobProgressListener(conf)
+
+  attachPage(new JobProgressPage(this))
+  attachPage(new StagePage(this))
+  attachPage(new PoolPage(this))
+  parent.registerListener(listener)
+
+  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+
+  def handleKillRequest(request: HttpServletRequest) =  {
+    if (killEnabled) {
+      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+        sc.cancelStage(stageId)
+      }
+      // Do a quick pause here to give Spark time to kill the stage so it shows up as
+      // killed after the refresh. Note that this will block the serving thread so the
+      // time should be limited in duration.
+      Thread.sleep(100)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
deleted file mode 100644
index 30e3f35..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressUI(parent: SparkUI) {
-  val basePath = parent.basePath
-  val live = parent.live
-  val sc = parent.sc
-  val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true)
-
-  lazy val listener = _listener.get
-  lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
-  private val indexPage = new IndexPage(this)
-  private val stagePage = new StagePage(this)
-  private val poolPage = new PoolPage(this)
-  private var _listener: Option[JobProgressListener] = None
-
-  def appName = parent.appName
-
-  def start() {
-    val conf = if (live) sc.conf else new SparkConf
-    _listener = Some(new JobProgressListener(conf))
-  }
-
-  def formatDuration(ms: Long) = Utils.msDurationToString(ms)
-
-  private def handleKillRequest(request: HttpServletRequest) =  {
-    if (killEnabled) {
-      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
-      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
-      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
-        sc.cancelStage(stageId)
-      }
-      // Do a quick pause here to give Spark time to kill the stage so it shows up as
-      // killed after the refresh. Note that this will block the serving thread so the
-      // time should be limited in duration.
-      Thread.sleep(100)
-    }
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest),
-    createServletHandler("/stages/stage",
-      (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
-    createServletHandler("/stages/pool",
-      (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
-    createServletHandler("/stages",
-      (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
-  )
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 3638e60..fd83d37 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -22,17 +22,15 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.scheduler.{Schedulable, StageInfo}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 
 /** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressUI) {
+private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
   private val live = parent.live
   private val sc = parent.sc
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -52,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressUI) {
         <h4>Summary </h4> ++ poolTable.toNodeSeq ++
         <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
 
-      UIUtils.headerSparkPage(
-        content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages)
+      UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
+        parent.headerTabs, parent)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index c5c8d86..f4b68f2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
   private val basePath = parent.basePath
-  private val poolToActiveStages = listener.poolToActiveStages
-  private lazy val listener = parent.listener
+  private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {
@@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
         <th>SchedulingMode</th>
       </thead>
       <tbody>
-        {rows.map(r => makeRow(r, poolToActiveStages))}
+        {rows.map(r => makeRow(r, listener.poolToActiveStages))}
       </tbody>
     </table>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index b6c3e3c..4bce472 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -22,17 +22,14 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressUI) {
+private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-  private lazy val sc = parent.sc
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -44,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
             <h4>Summary Metrics</h4> No tasks have started yet
             <h4>Tasks</h4> No tasks have started yet
           </div>
-        return UIUtils.headerSparkPage(
-          content, basePath, appName, "Details for Stage %s".format(stageId), Stages)
+        return UIUtils.headerSparkPage(content, basePath, appName,
+          "Details for Stage %s".format(stageId), parent.headerTabs, parent)
       }
 
       val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
@@ -60,7 +57,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
       val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
 
       var activeTime = 0L
-      val now = System.currentTimeMillis()
+      val now = System.currentTimeMillis
       val tasksActive = listener.stageIdToTasksActive(stageId).values
       tasksActive.foreach(activeTime += _.timeRunning(now))
 
@@ -70,7 +67,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           <ul class="unstyled">
             <li>
               <strong>Total task time across all tasks: </strong>
-              {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+              {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
             {if (hasShuffleRead)
               <li>
@@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val serializationQuantiles =
             "Result serialization time" +: Distribution(serializationTimes).
-              get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+              get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
 
           val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.executorRunTime.toDouble
           }
           val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
-            .map(ms => parent.formatDuration(ms.toLong))
+            .map(ms => UIUtils.formatDuration(ms.toLong))
 
           val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
             if (info.gettingResultTime > 0) {
@@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val gettingResultQuantiles = "Time spent fetching task results" +:
             Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
-              parent.formatDuration(millis.toLong)
+              UIUtils.formatDuration(millis.toLong)
             }
           // The scheduler delay includes the network delay to send the task to the worker
           // machine and to send back the result (but not the time to fetch the task result,
@@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val schedulerDelayQuantiles = "Scheduler delay" +:
             Distribution(schedulerDelays).get.getQuantiles().map { millis =>
-              parent.formatDuration(millis.toLong)
+              UIUtils.formatDuration(millis.toLong)
             }
 
           def getQuantileCols(data: Seq[Double]) =
@@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
         <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
         <h4>Tasks</h4> ++ taskTable
 
-      UIUtils.headerSparkPage(
-        content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+      UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+        parent.headerTabs, parent)
     }
   }
 
@@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
     taskData match { case TaskUIData(info, metrics, exception) =>
       val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
         else metrics.map(_.executorRunTime).getOrElse(1L)
-      val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
-        else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+      val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+        else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
       val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
       val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
 
@@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
 
       val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
       val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
-      val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
-        if (ms == 0) "" else parent.formatDuration(ms)
+      val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+        if (ms == 0) "" else UIUtils.formatDuration(ms)
       }.getOrElse("")
 
       val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
         <td>{info.status}</td>
         <td>{info.taskLocality}</td>
         <td>{info.host}</td>
-        <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
+        <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
         <td sorttable_customkey={duration.toString}>
           {formatDuration}
         </td>
         <td sorttable_customkey={gcTime.toString}>
-          {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+          {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
         </td>
         <td sorttable_customkey={serializationTime.toString}>
-          {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+          {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
         </td>
         {if (shuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index e419fae..8c5b1f5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap
 import scala.xml.Node
 
 import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished stages */
 private[ui] class StageTable(
-  stages: Seq[StageInfo],
-  parent: JobProgressUI,
-  killEnabled: Boolean = false) {
+    stages: Seq[StageInfo],
+    parent: JobProgressTab,
+    killEnabled: Boolean = false) {
 
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
+  private val listener = parent.listener
   private lazy val isFairScheduler = parent.isFairScheduler
 
   def toNodeSeq: Seq[Node] = {
@@ -89,25 +89,23 @@ private[ui] class StageTable(
         {s.name}
       </a>
 
-    val description = listener.stageIdToDescription.get(s.stageId)
+    listener.stageIdToDescription.get(s.stageId)
       .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
       .getOrElse(<div> {killLink}{nameLink}</div>)
-
-    return description
   }
 
   /** Render an HTML row that represents a stage */
   private def stageRow(s: StageInfo): Seq[Node] = {
     val poolName = listener.stageIdToPool.get(s.stageId)
     val submissionTime = s.submissionTime match {
-      case Some(t) => WebUI.formatDate(new Date(t))
+      case Some(t) => UIUtils.formatDate(new Date(t))
       case None => "Unknown"
     }
     val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
     val duration = s.submissionTime.map { t =>
       if (finishTime > t) finishTime - t else System.currentTimeMillis - t
     }
-    val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
     val startedTasks =
       listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
     val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
deleted file mode 100644
index 16996a2..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
-
-/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
-  val basePath = parent.basePath
-
-  private val indexPage = new IndexPage(this)
-  private val rddPage = new RDDPage(this)
-  private var _listener: Option[BlockManagerListener] = None
-
-  lazy val listener = _listener.get
-
-  def appName = parent.appName
-
-  def start() {
-    _listener = Some(new BlockManagerListener(parent.storageStatusListener))
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/storage/rdd",
-      (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
-    createServletHandler("/storage",
-      (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
-  )
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the BlockManagerUI
- */
-private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
-  private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
-
-  def storageStatusList = storageStatusListener.storageStatusList
-
-  /** Filter RDD info to include only those with cached partitions */
-  def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
-
-  /** Update each RDD's info to reflect any updates to the RDD's storage status */
-  private def updateRDDInfo() {
-    val rddInfos = _rddInfoMap.values.toSeq
-    val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
-    updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
-  }
-
-  /**
-   * Assumes the storage status list is fully up-to-date. This implies the corresponding
-   * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
-   */
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val metrics = taskEnd.taskMetrics
-    if (metrics != null && metrics.updatedBlocks.isDefined) {
-      updateRDDInfo()
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
-    val rddInfo = stageSubmitted.stageInfo.rddInfo
-    _rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
-  }
-
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
-    // Remove all partitions that are no longer cached
-    _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
-  }
-
-  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
-    updateRDDInfo()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
deleted file mode 100644
index 4f6acc3..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.Utils
-
-/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
-  private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-
-    val rdds = listener.rddInfoList
-    val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
-    UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
-  }
-
-  /** Header fields for the RDD table */
-  private def rddHeader = Seq(
-    "RDD Name",
-    "Storage Level",
-    "Cached Partitions",
-    "Fraction Cached",
-    "Size in Memory",
-    "Size in Tachyon",
-    "Size on Disk")
-
-  /** Render an HTML row representing an RDD */
-  private def rddRow(rdd: RDDInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
-          {rdd.name}
-        </a>
-      </td>
-      <td>{rdd.storageLevel.description}
-      </td>
-      <td>{rdd.numCachedPartitions}</td>
-      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
-      <td>{Utils.bytesToString(rdd.memSize)}</td>
-      <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
-      <td>{Utils.bytesToString(rdd.diskSize)}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 75ee997..d07f1c9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
 
 /** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     val rddId = request.getParameter("id").toInt
     val storageStatusList = listener.storageStatusList
     val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
       // Rather than crashing, render an "RDD Not Found" page
-      return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+      return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+        parent.headerTabs, parent)
     }
 
     // Worker table
@@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
         </div>
       </div>;
 
-    UIUtils.headerSparkPage(
-      content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+    UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+      parent.headerTabs, parent)
   }
 
   /** Header fields for the worker table */

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
new file mode 100644
index 0000000..b66edd9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+/** Page showing list of RDD's currently stored in the cluster */
+private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val rdds = listener.rddInfoList
+    val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
+    UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
+  }
+
+  /** Header fields for the RDD table */
+  private def rddHeader = Seq(
+    "RDD Name",
+    "Storage Level",
+    "Cached Partitions",
+    "Fraction Cached",
+    "Size in Memory",
+    "Size in Tachyon",
+    "Size on Disk")
+
+  /** Render an HTML row representing an RDD */
+  private def rddRow(rdd: RDDInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
+          {rdd.name}
+        </a>
+      </td>
+      <td>{rdd.storageLevel.description}
+      </td>
+      <td>{rdd.numCachedPartitions}</td>
+      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
+      <td>{Utils.bytesToString(rdd.memSize)}</td>
+      <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
+      <td>{Utils.bytesToString(rdd.diskSize)}</td>
+    </tr>
+  }
+}


[3/3] git commit: [SPARK-1386] Web UI for Spark Streaming

Posted by pw...@apache.org.
[SPARK-1386] Web UI for Spark Streaming

When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers?

While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine.

http://i.imgur.com/1ooDGhm.png

This UI is integrated into the Spark UI running at 4040.

Author: Tathagata Das <ta...@gmail.com>
Author: Andrew Or <an...@gmail.com>

Closes #290 from tdas/streaming-web-ui and squashes the following commits:

fc73ca5 [Tathagata Das] Merge pull request #9 from andrewor14/ui-refactor
642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala
eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor
f4f4cbe [Tathagata Das] More minor fixes.
34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
252c566 [Tathagata Das] Merge pull request #8 from andrewor14/ui-refactor
e038b4b [Tathagata Das] Addressed Patrick's comments.
125a054 [Andrew Or] Disable serving static resources with gzip
90feb8d [Andrew Or] Address Patrick's comments
89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
72fe256 [Tathagata Das] Merge pull request #6 from andrewor14/ui-refactor
2fc09c8 [Tathagata Das] Added binary check exclusions
aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala)
f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests.
caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
585cd65 [Tathagata Das] Merge pull request #5 from andrewor14/ui-refactor
914b8ff [Tathagata Das] Moved utils functions to UIUtils.
548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message)
6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui
ee6543f [Tathagata Das] Minor changes based on Andrew's comments.
fa760fe [Tathagata Das] Fixed long line.
1c0bcef [Tathagata Das] Refactored streaming UI into two files.
1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI.
827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor
3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui
c78c92d [Andrew Or] Remove outdated comment
8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor)
0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor
9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example
61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui
53be2c5 [Tathagata Das] Minor style updates.
ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically
a37ad4f [Andrew Or] Comments, imports and formatting (minor)
cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor
7d57444 [Andrew Or] Refactoring the UI interface to add flexibility
aef4dd5 [Tathagata Das] Added Apache licenses.
db27bad [Tathagata Das] Added last batch processing time to StreamingUI.
4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later.
93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI.
56cc7fb [Tathagata Das] First cut implementation of Streaming UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6aa08c39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6aa08c39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6aa08c39

Branch: refs/heads/master
Commit: 6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6
Parents: 165e06a
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Apr 11 23:33:49 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Apr 11 23:33:49 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   1 -
 .../apache/spark/deploy/SparkUIContainer.scala  |  50 -----
 .../spark/deploy/history/HistoryPage.scala      |  82 +++++++
 .../spark/deploy/history/HistoryServer.scala    |  61 +++---
 .../apache/spark/deploy/history/IndexPage.scala |  82 -------
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../deploy/master/ui/ApplicationPage.scala      |  13 +-
 .../spark/deploy/master/ui/IndexPage.scala      | 195 -----------------
 .../spark/deploy/master/ui/MasterPage.scala     | 194 +++++++++++++++++
 .../spark/deploy/master/ui/MasterWebUI.scala    |  54 ++---
 .../org/apache/spark/deploy/worker/Worker.scala |   2 +-
 .../spark/deploy/worker/ui/IndexPage.scala      | 165 --------------
 .../apache/spark/deploy/worker/ui/LogPage.scala | 147 +++++++++++++
 .../spark/deploy/worker/ui/WorkerPage.scala     | 165 ++++++++++++++
 .../spark/deploy/worker/ui/WorkerWebUI.scala    | 180 +++-------------
 .../scheduler/ApplicationEventListener.scala    |   4 +-
 .../org/apache/spark/storage/StorageUtils.scala |  16 +-
 .../scala/org/apache/spark/ui/JettyUtils.scala  |   1 +
 .../main/scala/org/apache/spark/ui/Page.scala   |  22 --
 .../scala/org/apache/spark/ui/SparkUI.scala     | 108 ++++------
 .../scala/org/apache/spark/ui/UIUtils.scala     | 172 +++++++++++----
 .../main/scala/org/apache/spark/ui/WebUI.scala  | 141 +++++++++---
 .../apache/spark/ui/env/EnvironmentPage.scala   |  56 +++++
 .../apache/spark/ui/env/EnvironmentTab.scala    |  50 +++++
 .../org/apache/spark/ui/env/EnvironmentUI.scala |  93 --------
 .../apache/spark/ui/exec/ExecutorsPage.scala    | 141 ++++++++++++
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  86 ++++++++
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 213 -------------------
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   7 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    | 100 ---------
 .../spark/ui/jobs/JobProgressListener.scala     |  10 +-
 .../apache/spark/ui/jobs/JobProgressPage.scala  |  98 +++++++++
 .../apache/spark/ui/jobs/JobProgressTab.scala   |  56 +++++
 .../apache/spark/ui/jobs/JobProgressUI.scala    |  77 -------
 .../org/apache/spark/ui/jobs/PoolPage.scala     |  14 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |   7 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  45 ++--
 .../org/apache/spark/ui/jobs/StageTable.scala   |  18 +-
 .../spark/ui/storage/BlockManagerUI.scala       |  99 ---------
 .../org/apache/spark/ui/storage/IndexPage.scala |  70 ------
 .../org/apache/spark/ui/storage/RDDPage.scala   |  17 +-
 .../apache/spark/ui/storage/StoragePage.scala   |  67 ++++++
 .../apache/spark/ui/storage/StorageTab.scala    |  81 +++++++
 .../org/apache/spark/util/JsonProtocol.scala    |  12 +-
 .../scala/org/apache/spark/SparkUISuite.scala   |  35 ---
 .../scala/org/apache/spark/ui/UISuite.scala     |  81 ++++++-
 .../apache/spark/util/JsonProtocolSuite.scala   |   4 +-
 project/MimaBuild.scala                         |   8 +-
 .../spark/streaming/StreamingContext.scala      |  23 +-
 .../spark/streaming/dstream/DStream.scala       |   9 -
 .../streaming/dstream/NetworkInputDStream.scala |  79 +++++--
 .../spark/streaming/scheduler/BatchInfo.scala   |   1 +
 .../streaming/scheduler/JobGenerator.scala      |   9 +-
 .../streaming/scheduler/JobScheduler.scala      |  11 +-
 .../spark/streaming/scheduler/JobSet.scala      |   7 +-
 .../scheduler/NetworkInputTracker.scala         |  86 +++++---
 .../streaming/scheduler/StreamingListener.scala |  18 +-
 .../scheduler/StreamingListenerBus.scala        |   4 +
 .../ui/StreamingJobProgressListener.scala       | 148 +++++++++++++
 .../spark/streaming/ui/StreamingPage.scala      | 180 ++++++++++++++++
 .../spark/streaming/ui/StreamingTab.scala       |  36 ++++
 .../spark/streaming/InputStreamsSuite.scala     |   6 +-
 .../spark/streaming/StreamingContextSuite.scala |   1 -
 .../org/apache/spark/streaming/UISuite.scala    |  46 ++++
 64 files changed, 2326 insertions(+), 1746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3bcc8ce..a764c17 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging {
   // Initialize the Spark UI, registering all associated listeners
   private[spark] val ui = new SparkUI(this)
   ui.bind()
-  ui.start()
 
   // Optionally log Spark events
   private[spark] val eventLogger: Option[EventLoggingListener] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
deleted file mode 100644
index 33fceae..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import org.apache.spark.ui.{SparkUI, WebUI}
-
-private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
-
-  /** Attach a SparkUI to this container. Only valid after bind(). */
-  def attachUI(ui: SparkUI) {
-    assert(serverInfo.isDefined,
-      "%s must be bound to a server before attaching SparkUIs".format(name))
-    val rootHandler = serverInfo.get.rootHandler
-    for (handler <- ui.handlers) {
-      rootHandler.addHandler(handler)
-      if (!handler.isStarted) {
-        handler.start()
-      }
-    }
-  }
-
-  /** Detach a SparkUI from this container. Only valid after bind(). */
-  def detachUI(ui: SparkUI) {
-    assert(serverInfo.isDefined,
-      "%s must be bound to a server before detaching SparkUIs".format(name))
-    val rootHandler = serverInfo.get.rootHandler
-    for (handler <- ui.handlers) {
-      if (handler.isStarted) {
-        handler.stop()
-      }
-      rootHandler.removeHandler(handler)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
new file mode 100644
index 0000000..180c853
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+
+private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
+    val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+    val content =
+      <div class="row-fluid">
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
+          </ul>
+          {
+            if (parent.appIdToInfo.size > 0) {
+              <h4>
+                Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
+                Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+              </h4> ++
+              appTable
+            } else {
+              <h4>No Completed Applications Found</h4>
+            }
+          }
+        </div>
+      </div>
+    UIUtils.basicSparkPage(content, "History Server")
+  }
+
+  private val appHeader = Seq(
+    "App Name",
+    "Started",
+    "Completed",
+    "Duration",
+    "Spark User",
+    "Log Directory",
+    "Last Updated")
+
+  private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+    val appName = if (info.started) info.name else info.logDirPath.getName
+    val uiAddress = parent.getAddress + info.ui.basePath
+    val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
+    val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
+    val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
+    val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
+    val sparkUser = if (info.started) info.sparkUser else "Unknown user"
+    val logDirectory = info.logDirPath.getName
+    val lastUpdated = UIUtils.formatDate(info.lastUpdated)
+    <tr>
+      <td><a href={uiAddress}>{appName}</a></td>
+      <td>{startTime}</td>
+      <td>{endTime}</td>
+      <td>{duration}</td>
+      <td>{sparkUser}</td>
+      <td>{logDirectory}</td>
+      <td>{lastUpdated}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 97d2ba9..cf64700 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,17 +17,13 @@
 
 package org.apache.spark.deploy.history
 
-import javax.servlet.http.HttpServletRequest
-
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkUIContainer
 import org.apache.spark.scheduler._
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{WebUI, SparkUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.Utils
 
@@ -46,17 +42,15 @@ import org.apache.spark.util.Utils
  */
 class HistoryServer(
     val baseLogDir: String,
+    securityManager: SecurityManager,
     conf: SparkConf)
-  extends SparkUIContainer("History Server") with Logging {
+  extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
 
   import HistoryServer._
 
   private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
   private val localHost = Utils.localHostName()
   private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
-  private val port = WEB_UI_PORT
-  private val securityManager = new SecurityManager(conf)
-  private val indexPage = new IndexPage(this)
 
   // A timestamp of when the disk was last accessed to check for log updates
   private var lastLogCheckTime = -1L
@@ -90,37 +84,23 @@ class HistoryServer(
     }
   }
 
-  private val handlers = Seq[ServletContextHandler](
-    createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
-    createServletHandler("/",
-      (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
-  )
-
   // A mapping of application ID to its history information, which includes the rendered UI
   val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
 
+  initialize()
+
   /**
-   * Start the history server.
+   * Initialize the history server.
    *
    * This starts a background thread that periodically synchronizes information displayed on
    * this UI with the event logs in the provided base directory.
    */
-  def start() {
+  def initialize() {
+    attachPage(new HistoryPage(this))
+    attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
     logCheckingThread.start()
   }
 
-  /** Bind to the HTTP server behind this web interface. */
-  override def bind() {
-    try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
-      logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to bind HistoryServer", e)
-        System.exit(1)
-    }
-  }
-
   /**
    * Check for any updates to event logs in the base directory. This is only effective once
    * the server has been bound.
@@ -151,7 +131,7 @@ class HistoryServer(
         // Remove any applications that should no longer be retained
         appIdToInfo.foreach { case (appId, info) =>
           if (!retainedAppIds.contains(appId)) {
-            detachUI(info.ui)
+            detachSparkUI(info.ui)
             appIdToInfo.remove(appId)
           }
         }
@@ -186,15 +166,14 @@ class HistoryServer(
     val path = logDir.getPath
     val appId = path.getName
     val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
-    val ui = new SparkUI(replayBus, appId, "/history/" + appId)
     val appListener = new ApplicationEventListener
     replayBus.addListener(appListener)
+    val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
 
     // Do not call ui.bind() to avoid creating a new server for each application
-    ui.start()
     replayBus.replay()
     if (appListener.applicationStarted) {
-      attachUI(ui)
+      attachSparkUI(ui)
       val appName = appListener.appName
       val sparkUser = appListener.sparkUser
       val startTime = appListener.startTime
@@ -213,6 +192,18 @@ class HistoryServer(
     fileSystem.close()
   }
 
+  /** Attach a reconstructed UI to this server. Only valid after bind(). */
+  private def attachSparkUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
+    ui.getHandlers.foreach(attachHandler)
+  }
+
+  /** Detach a reconstructed UI from this server. Only valid after bind(). */
+  private def detachSparkUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
+    ui.getHandlers.foreach(detachHandler)
+  }
+
   /** Return the address of this server. */
   def getAddress: String = "http://" + publicHost + ":" + boundPort
 
@@ -262,9 +253,9 @@ object HistoryServer {
 
   def main(argStrings: Array[String]) {
     val args = new HistoryServerArguments(argStrings)
-    val server = new HistoryServer(args.logDir, conf)
+    val securityManager = new SecurityManager(conf)
+    val server = new HistoryServer(args.logDir, securityManager, conf)
     server.bind()
-    server.start()
 
     // Wait until the end of the world... or if the HistoryServer process is manually stopped
     while(true) { Thread.sleep(Int.MaxValue) }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
deleted file mode 100644
index 54dffff..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.history
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils, WebUI}
-
-private[spark] class IndexPage(parent: HistoryServer) {
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
-    val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
-    val content =
-      <div class="row-fluid">
-        <div class="span12">
-          <ul class="unstyled">
-            <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
-          </ul>
-          {
-            if (parent.appIdToInfo.size > 0) {
-              <h4>
-                Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
-                Completed Application{if (parent.getNumApplications > 1) "s" else ""}
-              </h4> ++
-              appTable
-            } else {
-              <h4>No Completed Applications Found</h4>
-            }
-          }
-        </div>
-      </div>
-    UIUtils.basicSparkPage(content, "History Server")
-  }
-
-  private val appHeader = Seq(
-    "App Name",
-    "Started",
-    "Completed",
-    "Duration",
-    "Spark User",
-    "Log Directory",
-    "Last Updated")
-
-  private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
-    val appName = if (info.started) info.name else info.logDirPath.getName
-    val uiAddress = parent.getAddress + info.ui.basePath
-    val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
-    val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
-    val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
-    val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
-    val sparkUser = if (info.started) info.sparkUser else "Unknown user"
-    val logDirectory = info.logDirPath.getName
-    val lastUpdated = WebUI.formatDate(info.lastUpdated)
-    <tr>
-      <td><a href={uiAddress}>{appName}</a></td>
-      <td>{startTime}</td>
-      <td>{endTime}</td>
-      <td>{duration}</td>
-      <td>{sparkUser}</td>
-      <td>{logDirectory}</td>
-      <td>{lastUpdated}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2446e86..6c58e74 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -625,7 +625,7 @@ private[spark] class Master(
       if (completedApps.size >= RETAINED_APPLICATIONS) {
         val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
         completedApps.take(toRemove).foreach( a => {
-          appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
+          appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
           applicationMetricsSystem.removeSource(a.appSource)
         })
         completedApps.trimStart(toRemove)
@@ -667,12 +667,12 @@ private[spark] class Master(
     if (!eventLogPaths.isEmpty) {
       try {
         val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
-        val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
-        ui.start()
+        val ui = new SparkUI(
+          new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
         replayBus.replay()
         app.desc.appUiUrl = ui.basePath
         appIdToUI(app.id) = ui
-        webUi.attachUI(ui)
+        webUi.attachSparkUI(ui)
         return true
       } catch {
         case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index cb092cb..b5cd4d2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -28,15 +28,16 @@ import org.json4s.JValue
 import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.master.ExecutorInfo
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
 
-private[spark] class ApplicationPage(parent: MasterWebUI) {
-  val master = parent.masterActorRef
-  val timeout = parent.timeout
+private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {
+
+  private val master = parent.masterActorRef
+  private val timeout = parent.timeout
 
   /** Executor details for a particular application */
-  def renderJson(request: HttpServletRequest): JValue = {
+  override def renderJson(request: HttpServletRequest): JValue = {
     val appId = request.getParameter("appId")
     val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
     val state = Await.result(stateFuture, timeout)
@@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
     UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
   }
 
-  def executorRow(executor: ExecutorInfo): Seq[Node] = {
+  private def executorRow(executor: ExecutorInfo): Seq[Node] = {
     <tr>
       <td>{executor.id}</td>
       <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
deleted file mode 100644
index 8c1d6c7..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.concurrent.Await
-import scala.xml.Node
-
-import akka.pattern.ask
-import org.json4s.JValue
-
-import org.apache.spark.deploy.{JsonProtocol}
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
-import org.apache.spark.util.Utils
-
-private[spark] class IndexPage(parent: MasterWebUI) {
-  val master = parent.masterActorRef
-  val timeout = parent.timeout
-
-  def renderJson(request: HttpServletRequest): JValue = {
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, timeout)
-    JsonProtocol.writeMasterState(state)
-  }
-
-  /** Index view listing applications and executors */
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, timeout)
-
-    val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
-    val workers = state.workers.sortBy(_.id)
-    val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
-
-    val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
-      "State", "Duration")
-    val activeApps = state.activeApps.sortBy(_.startTime).reverse
-    val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
-    val completedApps = state.completedApps.sortBy(_.endTime).reverse
-    val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
-
-    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory",
-      "Main Class")
-    val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
-    val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
-    val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
-    val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)
-
-    // For now we only show driver information if the user has submitted drivers to the cluster.
-    // This is until we integrate the notion of drivers and applications in the UI.
-    def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0
-
-    val content =
-        <div class="row-fluid">
-          <div class="span12">
-            <ul class="unstyled">
-              <li><strong>URL:</strong> {state.uri}</li>
-              <li><strong>Workers:</strong> {state.workers.size}</li>
-              <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
-                {state.workers.map(_.coresUsed).sum} Used</li>
-              <li><strong>Memory:</strong>
-                {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
-                {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
-              <li><strong>Applications:</strong>
-                {state.activeApps.size} Running,
-                {state.completedApps.size} Completed </li>
-              <li><strong>Drivers:</strong>
-                {state.activeDrivers.size} Running,
-                {state.completedDrivers.size} Completed </li>
-              <li><strong>Status:</strong> {state.status}</li>
-            </ul>
-          </div>
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Workers </h4>
-            {workerTable}
-          </div>
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Running Applications </h4>
-            {activeAppsTable}
-          </div>
-        </div>
-
-        <div>
-          {if (hasDrivers) {
-             <div class="row-fluid">
-               <div class="span12">
-                 <h4> Running Drivers </h4>
-                 {activeDriversTable}
-               </div>
-             </div>
-           }
-          }
-        </div>
-
-        <div class="row-fluid">
-          <div class="span12">
-            <h4> Completed Applications </h4>
-            {completedAppsTable}
-          </div>
-        </div>
-
-        <div>
-          {
-            if (hasDrivers) {
-              <div class="row-fluid">
-                <div class="span12">
-                  <h4> Completed Drivers </h4>
-                  {completedDriversTable}
-                </div>
-              </div>
-            }
-          }
-        </div>;
-
-    UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
-  }
-
-  def workerRow(worker: WorkerInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={worker.webUiAddress}>{worker.id}</a>
-      </td>
-      <td>{worker.host}:{worker.port}</td>
-      <td>{worker.state}</td>
-      <td>{worker.cores} ({worker.coresUsed} Used)</td>
-      <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
-        {Utils.megabytesToString(worker.memory)}
-        ({Utils.megabytesToString(worker.memoryUsed)} Used)
-      </td>
-    </tr>
-  }
-
-
-  def appRow(app: ApplicationInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={"app?appId=" + app.id}>{app.id}</a>
-      </td>
-      <td>
-        <a href={app.desc.appUiUrl}>{app.desc.name}</a>
-      </td>
-      <td>
-        {app.coresGranted}
-      </td>
-      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
-        {Utils.megabytesToString(app.desc.memoryPerSlave)}
-      </td>
-      <td>{WebUI.formatDate(app.submitDate)}</td>
-      <td>{app.desc.user}</td>
-      <td>{app.state.toString}</td>
-      <td>{WebUI.formatDuration(app.duration)}</td>
-    </tr>
-  }
-
-  def driverRow(driver: DriverInfo): Seq[Node] = {
-    <tr>
-      <td>{driver.id} </td>
-      <td>{driver.submitDate}</td>
-      <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
-      </td>
-      <td>{driver.state}</td>
-      <td sorttable_customkey={driver.desc.cores.toString}>
-        {driver.desc.cores}
-      </td>
-      <td sorttable_customkey={driver.desc.mem.toString}>
-        {Utils.megabytesToString(driver.desc.mem.toLong)}
-      </td>
-      <td>{driver.desc.command.arguments(1)}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
new file mode 100644
index 0000000..7ca3b08
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.concurrent.Await
+import scala.xml.Node
+
+import akka.pattern.ask
+import org.json4s.JValue
+
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
+  private val master = parent.masterActorRef
+  private val timeout = parent.timeout
+
+  override def renderJson(request: HttpServletRequest): JValue = {
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, timeout)
+    JsonProtocol.writeMasterState(state)
+  }
+
+  /** Index view listing applications and executors */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, timeout)
+
+    val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+    val workers = state.workers.sortBy(_.id)
+    val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
+
+    val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
+      "State", "Duration")
+    val activeApps = state.activeApps.sortBy(_.startTime).reverse
+    val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
+    val completedApps = state.completedApps.sortBy(_.endTime).reverse
+    val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+
+    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory",
+      "Main Class")
+    val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
+    val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
+    val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
+    val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)
+
+    // For now we only show driver information if the user has submitted drivers to the cluster.
+    // This is until we integrate the notion of drivers and applications in the UI.
+    def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0
+
+    val content =
+        <div class="row-fluid">
+          <div class="span12">
+            <ul class="unstyled">
+              <li><strong>URL:</strong> {state.uri}</li>
+              <li><strong>Workers:</strong> {state.workers.size}</li>
+              <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
+                {state.workers.map(_.coresUsed).sum} Used</li>
+              <li><strong>Memory:</strong>
+                {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
+                {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+              <li><strong>Applications:</strong>
+                {state.activeApps.size} Running,
+                {state.completedApps.size} Completed </li>
+              <li><strong>Drivers:</strong>
+                {state.activeDrivers.size} Running,
+                {state.completedDrivers.size} Completed </li>
+              <li><strong>Status:</strong> {state.status}</li>
+            </ul>
+          </div>
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Workers </h4>
+            {workerTable}
+          </div>
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Running Applications </h4>
+            {activeAppsTable}
+          </div>
+        </div>
+
+        <div>
+          {if (hasDrivers) {
+             <div class="row-fluid">
+               <div class="span12">
+                 <h4> Running Drivers </h4>
+                 {activeDriversTable}
+               </div>
+             </div>
+           }
+          }
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Completed Applications </h4>
+            {completedAppsTable}
+          </div>
+        </div>
+
+        <div>
+          {
+            if (hasDrivers) {
+              <div class="row-fluid">
+                <div class="span12">
+                  <h4> Completed Drivers </h4>
+                  {completedDriversTable}
+                </div>
+              </div>
+            }
+          }
+        </div>;
+
+    UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
+  }
+
+  private def workerRow(worker: WorkerInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={worker.webUiAddress}>{worker.id}</a>
+      </td>
+      <td>{worker.host}:{worker.port}</td>
+      <td>{worker.state}</td>
+      <td>{worker.cores} ({worker.coresUsed} Used)</td>
+      <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
+        {Utils.megabytesToString(worker.memory)}
+        ({Utils.megabytesToString(worker.memoryUsed)} Used)
+      </td>
+    </tr>
+  }
+
+  private def appRow(app: ApplicationInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={"app?appId=" + app.id}>{app.id}</a>
+      </td>
+      <td>
+        <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+      </td>
+      <td>
+        {app.coresGranted}
+      </td>
+      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
+        {Utils.megabytesToString(app.desc.memoryPerSlave)}
+      </td>
+      <td>{UIUtils.formatDate(app.submitDate)}</td>
+      <td>{app.desc.user}</td>
+      <td>{app.state.toString}</td>
+      <td>{UIUtils.formatDuration(app.duration)}</td>
+    </tr>
+  }
+
+  private def driverRow(driver: DriverInfo): Seq[Node] = {
+    <tr>
+      <td>{driver.id} </td>
+      <td>{driver.submitDate}</td>
+      <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+      </td>
+      <td>{driver.state}</td>
+      <td sorttable_customkey={driver.desc.cores.toString}>
+        {driver.desc.cores}
+      </td>
+      <td sorttable_customkey={driver.desc.mem.toString}>
+        {Utils.megabytesToString(driver.desc.mem.toLong)}
+      </td>
+      <td>{driver.desc.command.arguments(1)}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 30c8ade..a18b39f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,14 +17,9 @@
 
 package org.apache.spark.deploy.master.ui
 
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
 import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkUIContainer
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -33,44 +28,33 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class MasterWebUI(val master: Master, requestedPort: Int)
-  extends SparkUIContainer("MasterWebUI") with Logging {
+  extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
 
   val masterActorRef = master.self
   val timeout = AkkaUtils.askTimeout(master.conf)
 
-  private val host = Utils.localHostName()
-  private val port = requestedPort
-  private val applicationPage = new ApplicationPage(this)
-  private val indexPage = new IndexPage(this)
+  initialize()
 
-  private val handlers: Seq[ServletContextHandler] = {
-    master.masterMetricsSystem.getServletHandlers ++
-    master.applicationMetricsSystem.getServletHandlers ++
-    Seq[ServletContextHandler](
-      createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
-      createServletHandler("/app/json",
-        (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
-      createServletHandler("/app",
-        (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
-      createServletHandler("/json",
-        (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
-      createServletHandler("/",
-        (request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
-    )
+  /** Initialize all components of the server. */
+  def initialize() {
+    attachPage(new ApplicationPage(this))
+    attachPage(new MasterPage(this))
+    attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
+    master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
+    master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
   }
 
-  /** Bind to the HTTP server behind this web interface. */
-  override def bind() {
-    try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
-      logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Master web UI", e)
-        System.exit(1)
-    }
+  /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
+  def attachSparkUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
+    ui.getHandlers.foreach(attachHandler)
   }
 
+  /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
+  def detachSparkUI(ui: SparkUI) {
+    assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
+    ui.getHandlers.foreach(detachHandler)
+  }
 }
 
 private[spark] object MasterWebUI {

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index bf5a8d0..52c164c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -128,8 +128,8 @@ private[spark] class Worker(
       host, port, cores, Utils.megabytesToString(memory)))
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
-    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
     webUi.bind()
     registerWithMaster()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
deleted file mode 100644
index 49c1009..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.worker.ui
-
-import scala.concurrent.Await
-import scala.xml.Node
-
-import akka.pattern.ask
-import javax.servlet.http.HttpServletRequest
-import org.json4s.JValue
-
-import org.apache.spark.deploy.JsonProtocol
-import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
-import org.apache.spark.deploy.master.DriverState
-import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.Utils
-
-private[spark] class IndexPage(parent: WorkerWebUI) {
-  val workerActor = parent.worker.self
-  val worker = parent.worker
-  val timeout = parent.timeout
-
-  def renderJson(request: HttpServletRequest): JValue = {
-    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
-    val workerState = Await.result(stateFuture, timeout)
-    JsonProtocol.writeWorkerState(workerState)
-  }
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
-    val workerState = Await.result(stateFuture, timeout)
-
-    val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
-    val runningExecutorTable =
-      UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
-    val finishedExecutorTable =
-      UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
-
-    val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes")
-    val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
-    val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
-    val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
-    def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
-
-    // For now we only show driver information if the user has submitted drivers to the cluster.
-    // This is until we integrate the notion of drivers and applications in the UI.
-    def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0
-
-    val content =
-        <div class="row-fluid"> <!-- Worker Details -->
-          <div class="span12">
-            <ul class="unstyled">
-              <li><strong>ID:</strong> {workerState.workerId}</li>
-              <li><strong>
-                Master URL:</strong> {workerState.masterUrl}
-              </li>
-              <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
-              <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
-                ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
-            </ul>
-            <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
-          </div>
-        </div>
-
-        <div class="row-fluid"> <!-- Running Executors -->
-          <div class="span12">
-            <h4> Running Executors {workerState.executors.size} </h4>
-            {runningExecutorTable}
-          </div>
-        </div>
-        // scalastyle:off
-        <div>
-          {if (hasDrivers)
-            <div class="row-fluid"> <!-- Running Drivers -->
-              <div class="span12">
-                <h4> Running Drivers {workerState.drivers.size} </h4>
-                {runningDriverTable}
-              </div>
-            </div>
-          }
-        </div>
-
-        <div class="row-fluid"> <!-- Finished Executors  -->
-          <div class="span12">
-            <h4> Finished Executors </h4>
-            {finishedExecutorTable}
-          </div>
-        </div>
-
-        <div>
-          {if (hasDrivers)
-            <div class="row-fluid"> <!-- Finished Drivers  -->
-              <div class="span12">
-                <h4> Finished Drivers </h4>
-                {finishedDriverTable}
-              </div>
-            </div>
-          }
-        </div>;
-    // scalastyle:on
-    UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
-      workerState.host, workerState.port))
-  }
-
-  def executorRow(executor: ExecutorRunner): Seq[Node] = {
-    <tr>
-      <td>{executor.execId}</td>
-      <td>{executor.cores}</td>
-      <td sorttable_customkey={executor.memory.toString}>
-        {Utils.megabytesToString(executor.memory)}
-      </td>
-      <td>
-        <ul class="unstyled">
-          <li><strong>ID:</strong> {executor.appId}</li>
-          <li><strong>Name:</strong> {executor.appDesc.name}</li>
-          <li><strong>User:</strong> {executor.appDesc.user}</li>
-        </ul>
-      </td>
-      <td>
-     <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
-        .format(executor.appId, executor.execId)}>stdout</a>
-     <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
-        .format(executor.appId, executor.execId)}>stderr</a>
-      </td>
-    </tr>
-
-  }
-
-  def driverRow(driver: DriverRunner): Seq[Node] = {
-    <tr>
-      <td>{driver.driverId}</td>
-      <td>{driver.driverDesc.command.arguments(1)}</td>
-      <td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td>
-      <td sorttable_customkey={driver.driverDesc.cores.toString}>
-        {driver.driverDesc.cores.toString}
-      </td>
-      <td sorttable_customkey={driver.driverDesc.mem.toString}>
-        {Utils.megabytesToString(driver.driverDesc.mem)}
-      </td>
-      <td>
-        <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
-        <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
-      </td>
-      <td>
-        {driver.finalException.getOrElse("")}
-      </td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
new file mode 100644
index 0000000..fec1207
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker.ui
+
+import java.io.File
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
+  private val worker = parent.worker
+  private val workDir = parent.workDir
+
+  def renderLog(request: HttpServletRequest): String = {
+    val defaultBytes = 100 * 1024
+
+    val appId = Option(request.getParameter("appId"))
+    val executorId = Option(request.getParameter("executorId"))
+    val driverId = Option(request.getParameter("driverId"))
+    val logType = request.getParameter("logType")
+    val offset = Option(request.getParameter("offset")).map(_.toLong)
+    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+
+    val path = (appId, executorId, driverId) match {
+      case (Some(a), Some(e), None) =>
+        s"${workDir.getPath}/$appId/$executorId/$logType"
+      case (None, None, Some(d)) =>
+        s"${workDir.getPath}/$driverId/$logType"
+      case _ =>
+        throw new Exception("Request must specify either application or driver identifiers")
+    }
+
+    val (startByte, endByte) = getByteRange(path, offset, byteLength)
+    val file = new File(path)
+    val logLength = file.length
+
+    val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
+    pre + Utils.offsetBytes(path, startByte, endByte)
+  }
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val defaultBytes = 100 * 1024
+    val appId = Option(request.getParameter("appId"))
+    val executorId = Option(request.getParameter("executorId"))
+    val driverId = Option(request.getParameter("driverId"))
+    val logType = request.getParameter("logType")
+    val offset = Option(request.getParameter("offset")).map(_.toLong)
+    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+
+    val (path, params) = (appId, executorId, driverId) match {
+      case (Some(a), Some(e), None) =>
+        (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+      case (None, None, Some(d)) =>
+        (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+      case _ =>
+        throw new Exception("Request must specify either application or driver identifiers")
+    }
+
+    val (startByte, endByte) = getByteRange(path, offset, byteLength)
+    val file = new File(path)
+    val logLength = file.length
+    val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+    val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
+    val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+    val backButton =
+      if (startByte > 0) {
+        <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
+          .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
+          <button type="button" class="btn btn-default">
+            Previous {Utils.bytesToString(math.min(byteLength, startByte))}
+          </button>
+        </a>
+      }
+      else {
+        <button type="button" class="btn btn-default" disabled="disabled">
+          Previous 0 B
+        </button>
+      }
+
+    val nextButton =
+      if (endByte < logLength) {
+        <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
+          format(params, logType, endByte, byteLength)}>
+          <button type="button" class="btn btn-default">
+            Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
+          </button>
+        </a>
+      }
+      else {
+        <button type="button" class="btn btn-default" disabled="disabled">
+          Next 0 B
+        </button>
+      }
+
+    val content =
+      <html>
+        <body>
+          {linkToMaster}
+          <div>
+            <div style="float:left; margin-right:10px">{backButton}</div>
+            <div style="float:left;">{range}</div>
+            <div style="float:right; margin-left:10px">{nextButton}</div>
+          </div>
+          <br />
+          <div style="height:500px; overflow:auto; padding:5px;">
+            <pre>{logText}</pre>
+          </div>
+        </body>
+      </html>
+    UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+  }
+
+  /** Determine the byte range for a log or log page. */
+  private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
+    val defaultBytes = 100 * 1024
+    val maxBytes = 1024 * 1024
+    val file = new File(path)
+    val logLength = file.length()
+    val getOffset = offset.getOrElse(logLength - defaultBytes)
+    val startByte =
+      if (getOffset < 0) 0L
+      else if (getOffset > logLength) logLength
+      else getOffset
+    val logPageLength = math.min(byteLength, maxBytes)
+    val endByte = math.min(startByte + logPageLength, logLength)
+    (startByte, endByte)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
new file mode 100644
index 0000000..d451311
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker.ui
+
+import scala.concurrent.Await
+import scala.xml.Node
+
+import akka.pattern.ask
+import javax.servlet.http.HttpServletRequest
+import org.json4s.JValue
+
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
+  val workerActor = parent.worker.self
+  val worker = parent.worker
+  val timeout = parent.timeout
+
+  override def renderJson(request: HttpServletRequest): JValue = {
+    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
+    val workerState = Await.result(stateFuture, timeout)
+    JsonProtocol.writeWorkerState(workerState)
+  }
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
+    val workerState = Await.result(stateFuture, timeout)
+
+    val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
+    val runningExecutorTable =
+      UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
+    val finishedExecutorTable =
+      UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
+
+    val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes")
+    val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
+    val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
+    val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
+    def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
+
+    // For now we only show driver information if the user has submitted drivers to the cluster.
+    // This is until we integrate the notion of drivers and applications in the UI.
+    def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0
+
+    val content =
+        <div class="row-fluid"> <!-- Worker Details -->
+          <div class="span12">
+            <ul class="unstyled">
+              <li><strong>ID:</strong> {workerState.workerId}</li>
+              <li><strong>
+                Master URL:</strong> {workerState.masterUrl}
+              </li>
+              <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+              <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
+                ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
+            </ul>
+            <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+          </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Running Executors -->
+          <div class="span12">
+            <h4> Running Executors {workerState.executors.size} </h4>
+            {runningExecutorTable}
+          </div>
+        </div>
+        // scalastyle:off
+        <div>
+          {if (hasDrivers)
+            <div class="row-fluid"> <!-- Running Drivers -->
+              <div class="span12">
+                <h4> Running Drivers {workerState.drivers.size} </h4>
+                {runningDriverTable}
+              </div>
+            </div>
+          }
+        </div>
+
+        <div class="row-fluid"> <!-- Finished Executors  -->
+          <div class="span12">
+            <h4> Finished Executors </h4>
+            {finishedExecutorTable}
+          </div>
+        </div>
+
+        <div>
+          {if (hasDrivers)
+            <div class="row-fluid"> <!-- Finished Drivers  -->
+              <div class="span12">
+                <h4> Finished Drivers </h4>
+                {finishedDriverTable}
+              </div>
+            </div>
+          }
+        </div>;
+    // scalastyle:on
+    UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
+      workerState.host, workerState.port))
+  }
+
+  def executorRow(executor: ExecutorRunner): Seq[Node] = {
+    <tr>
+      <td>{executor.execId}</td>
+      <td>{executor.cores}</td>
+      <td sorttable_customkey={executor.memory.toString}>
+        {Utils.megabytesToString(executor.memory)}
+      </td>
+      <td>
+        <ul class="unstyled">
+          <li><strong>ID:</strong> {executor.appId}</li>
+          <li><strong>Name:</strong> {executor.appDesc.name}</li>
+          <li><strong>User:</strong> {executor.appDesc.user}</li>
+        </ul>
+      </td>
+      <td>
+     <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
+        .format(executor.appId, executor.execId)}>stdout</a>
+     <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
+        .format(executor.appId, executor.execId)}>stderr</a>
+      </td>
+    </tr>
+
+  }
+
+  def driverRow(driver: DriverRunner): Seq[Node] = {
+    <tr>
+      <td>{driver.driverId}</td>
+      <td>{driver.driverDesc.command.arguments(1)}</td>
+      <td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td>
+      <td sorttable_customkey={driver.driverDesc.cores.toString}>
+        {driver.driverDesc.cores.toString}
+      </td>
+      <td sorttable_customkey={driver.driverDesc.mem.toString}>
+        {Utils.megabytesToString(driver.driverDesc.mem)}
+      </td>
+      <td>
+        <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
+        <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
+      </td>
+      <td>
+        {driver.finalException.getOrElse("")}
+      </td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 5625a44..0ad2edb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -20,174 +20,44 @@ package org.apache.spark.deploy.worker.ui
 import java.io.File
 import javax.servlet.http.HttpServletRequest
 
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
+import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.AkkaUtils
 
 /**
  * Web UI server for the standalone worker.
  */
 private[spark]
-class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
-  extends WebUI("WorkerWebUI") with Logging {
+class WorkerWebUI(
+    val worker: Worker,
+    val workDir: File,
+    port: Option[Int] = None)
+  extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
+  with Logging {
 
   val timeout = AkkaUtils.askTimeout(worker.conf)
 
-  private val host = Utils.localHostName()
-  private val port = requestedPort.getOrElse(
-    worker.conf.getInt("worker.ui.port",  WorkerWebUI.DEFAULT_PORT))
-  private val indexPage = new IndexPage(this)
-
-  private val handlers: Seq[ServletContextHandler] = {
-    worker.metricsSystem.getServletHandlers ++
-    Seq[ServletContextHandler](
-      createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"),
-      createServletHandler("/log",
-        (request: HttpServletRequest) => log(request), worker.securityMgr),
-      createServletHandler("/logPage",
-        (request: HttpServletRequest) => logPage(request), worker.securityMgr),
-      createServletHandler("/json",
-        (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr),
-      createServletHandler("/",
-        (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr)
-    )
-  }
-
-  /** Bind to the HTTP server behind this web interface. */
-  override def bind() {
-    try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
-      logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Worker web UI", e)
-        System.exit(1)
-    }
-  }
-
-  private def log(request: HttpServletRequest): String = {
-    val defaultBytes = 100 * 1024
-
-    val appId = Option(request.getParameter("appId"))
-    val executorId = Option(request.getParameter("executorId"))
-    val driverId = Option(request.getParameter("driverId"))
-    val logType = request.getParameter("logType")
-    val offset = Option(request.getParameter("offset")).map(_.toLong)
-    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-
-    val path = (appId, executorId, driverId) match {
-      case (Some(a), Some(e), None) =>
-        s"${workDir.getPath}/$appId/$executorId/$logType"
-      case (None, None, Some(d)) =>
-        s"${workDir.getPath}/$driverId/$logType"
-      case _ =>
-        throw new Exception("Request must specify either application or driver identifiers")
-    }
-
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-
-    val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
-    pre + Utils.offsetBytes(path, startByte, endByte)
-  }
-
-  private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
-    val defaultBytes = 100 * 1024
-    val appId = Option(request.getParameter("appId"))
-    val executorId = Option(request.getParameter("executorId"))
-    val driverId = Option(request.getParameter("driverId"))
-    val logType = request.getParameter("logType")
-    val offset = Option(request.getParameter("offset")).map(_.toLong)
-    val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-
-    val (path, params) = (appId, executorId, driverId) match {
-      case (Some(a), Some(e), None) =>
-        (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
-      case (None, None, Some(d)) =>
-        (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
-      case _ =>
-        throw new Exception("Request must specify either application or driver identifiers")
-    }
-
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-    val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
-    val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
-    val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
-
-    val backButton =
-      if (startByte > 0) {
-        <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
-          .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
-          <button type="button" class="btn btn-default">
-            Previous {Utils.bytesToString(math.min(byteLength, startByte))}
-          </button>
-        </a>
-      }
-      else {
-        <button type="button" class="btn btn-default" disabled="disabled">
-          Previous 0 B
-        </button>
-      }
-
-    val nextButton =
-      if (endByte < logLength) {
-        <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
-          format(params, logType, endByte, byteLength)}>
-          <button type="button" class="btn btn-default">
-            Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
-          </button>
-        </a>
-      }
-      else {
-        <button type="button" class="btn btn-default" disabled="disabled">
-          Next 0 B
-        </button>
-      }
-
-    val content =
-      <html>
-        <body>
-          {linkToMaster}
-          <div>
-            <div style="float:left; margin-right:10px">{backButton}</div>
-            <div style="float:left;">{range}</div>
-            <div style="float:right; margin-left:10px">{nextButton}</div>
-          </div>
-          <br />
-          <div style="height:500px; overflow:auto; padding:5px;">
-            <pre>{logText}</pre>
-          </div>
-        </body>
-      </html>
-    UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+  initialize()
+
+  /** Initialize all components of the server. */
+  def initialize() {
+    val logPage = new LogPage(this)
+    attachPage(logPage)
+    attachPage(new WorkerPage(this))
+    attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
+    attachHandler(createServletHandler("/log",
+      (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
+    worker.metricsSystem.getServletHandlers.foreach(attachHandler)
   }
-
-  /** Determine the byte range for a log or log page. */
-  private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
-    val defaultBytes = 100 * 1024
-    val maxBytes = 1024 * 1024
-    val file = new File(path)
-    val logLength = file.length()
-    val getOffset = offset.getOrElse(logLength - defaultBytes)
-    val startByte =
-      if (getOffset < 0) 0L
-      else if (getOffset > logLength) logLength
-      else getOffset
-    val logPageLength = math.min(byteLength, maxBytes)
-    val endByte = math.min(startByte + logPageLength, logLength)
-    (startByte, endByte)
-  }
-
 }
 
 private[spark] object WorkerWebUI {
-  val DEFAULT_PORT=8081
+  val DEFAULT_PORT = 8081
   val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
+
+  def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
+    requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index affda13..c100122 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -31,11 +31,11 @@ private[spark] class ApplicationEventListener extends SparkListener {
 
   def applicationStarted = startTime != -1
 
-  def applicationFinished = endTime != -1
+  def applicationCompleted = endTime != -1
 
   def applicationDuration: Long = {
     val difference = endTime - startTime
-    if (applicationStarted && applicationFinished && difference > 0) difference else -1L
+    if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
   }
 
   override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 07255aa..7ed3713 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -42,24 +42,22 @@ class StorageStatus(
 
   def memRemaining : Long = maxMem - memUsed()
 
-  def rddBlocks = blocks.flatMap {
-    case (rdd: RDDBlockId, status) => Some(rdd, status)
-    case _ => None
-  }
+  def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
 }
 
 @DeveloperApi
 private[spark]
 class RDDInfo(
-  val id: Int,
-  val name: String,
-  val numPartitions: Int,
-  val storageLevel: StorageLevel) extends Ordered[RDDInfo] {
+    val id: Int,
+    val name: String,
+    val numPartitions: Int,
+    val storageLevel: StorageLevel)
+  extends Ordered[RDDInfo] {
 
   var numCachedPartitions = 0
   var memSize = 0L
   var diskSize = 0L
-  var tachyonSize= 0L
+  var tachyonSize = 0L
 
   override def toString = {
     import Utils.bytesToString

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index dd0818e..62a4e3d 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -121,6 +121,7 @@ private[spark] object JettyUtils extends Logging {
   /** Create a handler for serving files from a static directory */
   def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
     val contextHandler = new ServletContextHandler
+    contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
     val staticHandler = new DefaultServlet
     val holder = new ServletHolder(staticHandler)
     Option(getClass.getClassLoader.getResource(resourceBase)) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/Page.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/Page.scala b/core/src/main/scala/org/apache/spark/ui/Page.scala
deleted file mode 100644
index b2a069a..0000000
--- a/core/src/main/scala/org/apache/spark/ui/Page.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui
-
-private[spark] object Page extends Enumeration {
-  val Stages, Storage, Environment, Executors = Value
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 7fa4fd3..2fef1a6 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,112 +17,86 @@
 
 package org.apache.spark.ui
 
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.env.EnvironmentUI
-import org.apache.spark.ui.exec.ExecutorsUI
-import org.apache.spark.ui.jobs.JobProgressUI
-import org.apache.spark.ui.storage.BlockManagerUI
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.env.EnvironmentTab
+import org.apache.spark.ui.exec.ExecutorsTab
+import org.apache.spark.ui.jobs.JobProgressTab
+import org.apache.spark.ui.storage.StorageTab
 
-/** Top level user interface for Spark */
+/**
+ * Top level user interface for a Spark application.
+ */
 private[spark] class SparkUI(
     val sc: SparkContext,
     val conf: SparkConf,
+    val securityManager: SecurityManager,
     val listenerBus: SparkListenerBus,
     var appName: String,
     val basePath: String = "")
-  extends WebUI("SparkUI") with Logging {
+  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
+  with Logging {
 
-  def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
-  def this(listenerBus: SparkListenerBus, appName: String, basePath: String) =
-    this(null, new SparkConf, listenerBus, appName, basePath)
+  def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
+  def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
+    this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
 
   // If SparkContext is not provided, assume the associated application is not live
   val live = sc != null
 
-  val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
-
-  private val localHost = Utils.localHostName()
-  private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
-  private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
+  // Maintain executor storage status through Spark events
+  val storageStatusListener = new StorageStatusListener
 
-  private val storage = new BlockManagerUI(this)
-  private val jobs = new JobProgressUI(this)
-  private val env = new EnvironmentUI(this)
-  private val exec = new ExecutorsUI(this)
+  initialize()
 
-  val handlers: Seq[ServletContextHandler] = {
-    val metricsServletHandlers = if (live) {
-      SparkEnv.get.metricsSystem.getServletHandlers
-    } else {
-      Array[ServletContextHandler]()
+  /** Initialize all components of the server. */
+  def initialize() {
+    listenerBus.addListener(storageStatusListener)
+    val jobProgressTab = new JobProgressTab(this)
+    attachTab(jobProgressTab)
+    attachTab(new StorageTab(this))
+    attachTab(new EnvironmentTab(this))
+    attachTab(new ExecutorsTab(this))
+    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
+    attachHandler(
+      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
+    if (live) {
+      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
     }
-    storage.getHandlers ++
-    jobs.getHandlers ++
-    env.getHandlers ++
-    exec.getHandlers ++
-    metricsServletHandlers ++
-    Seq[ServletContextHandler] (
-      createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
-      createRedirectHandler("/", "/stages", basePath = basePath)
-    )
   }
 
-  // Maintain executor storage status through Spark events
-  val storageStatusListener = new StorageStatusListener
-
+  /** Set the app name for this UI. */
   def setAppName(name: String) {
     appName = name
   }
 
-  /** Initialize all components of the server */
-  def start() {
-    storage.start()
-    jobs.start()
-    env.start()
-    exec.start()
-
-    // Storage status listener must receive events first, as other listeners depend on its state
-    listenerBus.addListener(storageStatusListener)
-    listenerBus.addListener(storage.listener)
-    listenerBus.addListener(jobs.listener)
-    listenerBus.addListener(env.listener)
-    listenerBus.addListener(exec.listener)
-  }
-
-  /** Bind to the HTTP server behind this web interface. */
-  override def bind() {
-    try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
-      logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
-    } catch {
-      case e: Exception =>
-        logError("Failed to create Spark web UI", e)
-        System.exit(1)
-    }
+  /** Register the given listener with the listener bus. */
+  def registerListener(listener: SparkListener) {
+    listenerBus.addListener(listener)
   }
 
   /** Stop the server behind this web interface. Only valid after bind(). */
   override def stop() {
     super.stop()
-    logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
+    logInfo("Stopped Spark web UI at %s".format(appUIAddress))
   }
 
   /**
    * Return the application UI host:port. This does not include the scheme (http://).
    */
-  private[spark] def appUIHostPort = publicHost + ":" + boundPort
+  private[spark] def appUIHostPort = publicHostName + ":" + boundPort
 
   private[spark] def appUIAddress = s"http://$appUIHostPort"
-
 }
 
 private[spark] object SparkUI {
   val DEFAULT_PORT = 4040
   val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+
+  def getUIPort(conf: SparkConf): Int = {
+    conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
+  }
 }