You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/12 08:34:05 UTC

[2/3] [SPARK-1386] Web UI for Spark Streaming

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a7cf04b..6a2d652 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,16 +17,115 @@
 
 package org.apache.spark.ui
 
+import java.text.SimpleDateFormat
+import java.util.{Locale, Date}
+
 import scala.xml.Node
+import org.apache.spark.Logging
 
 /** Utility functions for generating XML pages with spark content. */
-private[spark] object UIUtils {
+private[spark] object UIUtils extends Logging {
+
+  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  }
+
+  def formatDate(date: Date): String = dateFormat.get.format(date)
+
+  def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+  def formatDuration(milliseconds: Long): String = {
+    val seconds = milliseconds.toDouble / 1000
+    if (seconds < 60) {
+      return "%.0f s".format(seconds)
+    }
+    val minutes = seconds / 60
+    if (minutes < 10) {
+      return "%.1f min".format(minutes)
+    } else if (minutes < 60) {
+      return "%.0f min".format(minutes)
+    }
+    val hours = minutes / 60
+    "%.1f h".format(hours)
+  }
+
+  /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
+  def formatDurationVerbose(ms: Long): String = {
+    try {
+      val second = 1000L
+      val minute = 60 * second
+      val hour = 60 * minute
+      val day = 24 * hour
+      val week = 7 * day
+      val year = 365 * day
+
+      def toString(num: Long, unit: String): String = {
+        if (num == 0) {
+          ""
+        } else if (num == 1) {
+          s"$num $unit"
+        } else {
+          s"$num ${unit}s"
+        }
+      }
+
+      val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
+      val secondString = toString((ms % minute) / second, "second")
+      val minuteString = toString((ms % hour) / minute, "minute")
+      val hourString = toString((ms % day) / hour, "hour")
+      val dayString = toString((ms % week) / day, "day")
+      val weekString = toString((ms % year) / week, "week")
+      val yearString = toString(ms / year, "year")
 
-  import Page._
+      Seq(
+        second -> millisecondsString,
+        minute -> s"$secondString $millisecondsString",
+        hour -> s"$minuteString $secondString",
+        day -> s"$hourString $minuteString $secondString",
+        week -> s"$dayString $hourString $minuteString",
+        year -> s"$weekString $dayString $hourString"
+      ).foreach { case (durationLimit, durationString) =>
+        if (ms < durationLimit) {
+          // if time is less than the limit (upto year)
+          return durationString
+        }
+      }
+      // if time is more than a year
+      return s"$yearString $weekString $dayString"
+    } catch {
+      case e: Exception =>
+        logError("Error converting time to string", e)
+        // if there is some error, return blank string
+        return ""
+    }
+  }
+
+  /** Generate a human-readable string representing a number (e.g. 100 K) */
+  def formatNumber(records: Double): String = {
+    val trillion = 1e12
+    val billion = 1e9
+    val million = 1e6
+    val thousand = 1e3
+
+    val (value, unit) = {
+      if (records >= 2*trillion) {
+        (records / trillion, " T")
+      } else if (records >= 2*billion) {
+        (records / billion, " B")
+      } else if (records >= 2*million) {
+        (records / million, " M")
+      } else if (records >= 2*thousand) {
+        (records / thousand, " K")
+      } else {
+        (records, "")
+      }
+    }
+    "%.1f%s".formatLocal(Locale.US, value, unit)
+  }
 
   // Yarn has to go through a proxy so the base uri is provided and has to be on all links
-  private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
-    getOrElse("")
+  val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
 
   def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
 
@@ -36,26 +135,14 @@ private[spark] object UIUtils {
       basePath: String,
       appName: String,
       title: String,
-      page: Page.Value) : Seq[Node] = {
-    val jobs = page match {
-      case Stages =>
-        <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
-    }
-    val storage = page match {
-      case Storage =>
-        <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
-    }
-    val environment = page match {
-      case Environment =>
-        <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
-    }
-    val executors = page match {
-      case Executors =>
-        <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
-      case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+      tabs: Seq[WebUITab],
+      activeTab: WebUITab,
+      refreshInterval: Option[Int] = None): Seq[Node] = {
+
+    val header = tabs.map { tab =>
+      <li class={if (tab == activeTab) "active" else ""}>
+        <a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
+      </li>
     }
 
     <html>
@@ -74,16 +161,10 @@ private[spark] object UIUtils {
             <a href={prependBaseUri(basePath, "/")} class="brand">
               <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
             </a>
-            <ul class="nav">
-              {jobs}
-              {storage}
-              {environment}
-              {executors}
-            </ul>
+            <ul class="nav">{header}</ul>
             <p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
           </div>
         </div>
-
         <div class="container-fluid">
           <div class="row-fluid">
             <div class="span12">
@@ -129,21 +210,36 @@ private[spark] object UIUtils {
   /** Returns an HTML table constructed by generating a row for each object in a sequence. */
   def listingTable[T](
       headers: Seq[String],
-      makeRow: T => Seq[Node],
-      rows: Seq[T],
+      generateDataRow: T => Seq[Node],
+      data: Seq[T],
       fixedWidth: Boolean = false): Seq[Node] = {
 
-    val colWidth = 100.toDouble / headers.size
-    val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
     var tableClass = "table table-bordered table-striped table-condensed sortable"
     if (fixedWidth) {
       tableClass += " table-fixed"
     }
-
+    val colWidth = 100.toDouble / headers.size
+    val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+    val headerRow: Seq[Node] = {
+      // if none of the headers have "\n" in them
+      if (headers.forall(!_.contains("\n"))) {
+        // represent header as simple text
+        headers.map(h => <th width={colWidthAttr}>{h}</th>)
+      } else {
+        // represent header text as list while respecting "\n"
+        headers.map { case h =>
+          <th width={colWidthAttr}>
+            <ul class ="unstyled">
+              { h.split("\n").map { case t => <li> {t} </li> } }
+            </ul>
+          </th>
+        }
+      }
+    }
     <table class={tableClass}>
-      <thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
+      <thead>{headerRow}</thead>
       <tbody>
-        {rows.map(r => makeRow(r))}
+        {data.map(r => generateDataRow(r))}
       </tbody>
     </table>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 2cc7582..b08f308 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -17,53 +17,134 @@
 
 package org.apache.spark.ui
 
-import java.text.SimpleDateFormat
-import java.util.Date
+import javax.servlet.http.HttpServletRequest
 
-private[spark] abstract class WebUI(name: String) {
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.Node
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.json4s.JsonAST.{JNothing, JValue}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * The top level component of the UI hierarchy that contains the server.
+ *
+ * Each WebUI represents a collection of tabs, each of which in turn represents a collection of
+ * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
+ */
+private[spark] abstract class WebUI(
+    securityManager: SecurityManager,
+    port: Int,
+    conf: SparkConf,
+    basePath: String = "")
+  extends Logging {
+
+  protected val tabs = ArrayBuffer[WebUITab]()
+  protected val handlers = ArrayBuffer[ServletContextHandler]()
   protected var serverInfo: Option[ServerInfo] = None
+  protected val localHostName = Utils.localHostName()
+  protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+  private val className = Utils.getFormattedClassName(this)
+
+  def getTabs: Seq[WebUITab] = tabs.toSeq
+  def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+
+  /** Attach a tab to this UI, along with all of its attached pages. */
+  def attachTab(tab: WebUITab) {
+    tab.pages.foreach(attachPage)
+    tabs += tab
+  }
+
+  /** Attach a page to this UI. */
+  def attachPage(page: WebUIPage) {
+    val pagePath = "/" + page.prefix
+    attachHandler(createServletHandler(pagePath,
+      (request: HttpServletRequest) => page.render(request), securityManager, basePath))
+    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
+      (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+  }
+
+  /** Attach a handler to this UI. */
+  def attachHandler(handler: ServletContextHandler) {
+    handlers += handler
+    serverInfo.foreach { info =>
+      info.rootHandler.addHandler(handler)
+      if (!handler.isStarted) {
+        handler.start()
+      }
+    }
+  }
 
-  /**
-   * Bind to the HTTP server behind this web interface.
-   * Overridden implementation should set serverInfo.
-   */
-  def bind() { }
+  /** Detach a handler from this UI. */
+  def detachHandler(handler: ServletContextHandler) {
+    handlers -= handler
+    serverInfo.foreach { info =>
+      info.rootHandler.removeHandler(handler)
+      if (handler.isStarted) {
+        handler.stop()
+      }
+    }
+  }
+
+  /** Initialize all components of the server. */
+  def initialize()
+
+  /** Bind to the HTTP server behind this web interface. */
+  def bind() {
+    assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
+    try {
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+      logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to bind %s".format(className), e)
+        System.exit(1)
+    }
+  }
 
   /** Return the actual port to which this server is bound. Only valid after bind(). */
   def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
 
   /** Stop the server behind this web interface. Only valid after bind(). */
   def stop() {
-    assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+    assert(serverInfo.isDefined,
+      "Attempted to stop %s before binding to a server!".format(className))
     serverInfo.get.server.stop()
   }
 }
 
+
 /**
- * Utilities used throughout the web UI.
+ * A tab that represents a collection of pages.
+ * The prefix is appended to the parent address to form a full path, and must not contain slashes.
  */
-private[spark] object WebUI {
-  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
-  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
+  val pages = ArrayBuffer[WebUIPage]()
+  val name = prefix.capitalize
+
+  /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
+  def attachPage(page: WebUIPage) {
+    page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
+    pages += page
   }
 
-  def formatDate(date: Date): String = dateFormat.get.format(date)
+  /** Get a list of header tabs from the parent UI. */
+  def headerTabs: Seq[WebUITab] = parent.getTabs
+}
 
-  def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
 
-  def formatDuration(milliseconds: Long): String = {
-    val seconds = milliseconds.toDouble / 1000
-    if (seconds < 60) {
-      return "%.0f s".format(seconds)
-    }
-    val minutes = seconds / 60
-    if (minutes < 10) {
-      return "%.1f min".format(minutes)
-    } else if (minutes < 60) {
-      return "%.0f min".format(minutes)
-    }
-    val hours = minutes / 60
-    "%.1f h".format(hours)
-  }
+/**
+ * A page that represents the leaf node in the UI hierarchy.
+ *
+ * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
+ * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
+ * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
+ * to form a relative path. The prefix must not contain slashes.
+ */
+private[spark] abstract class WebUIPage(var prefix: String) {
+  def render(request: HttpServletRequest): Seq[Node]
+  def renderJson(request: HttpServletRequest): JValue = JNothing
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
new file mode 100644
index 0000000..b347eb1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val runtimeInformationTable = UIUtils.listingTable(
+      propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
+    val sparkPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
+    val systemPropertiesTable = UIUtils.listingTable(
+      propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
+    val classpathEntriesTable = UIUtils.listingTable(
+      classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
+    val content =
+      <span>
+        <h4>Runtime Information</h4> {runtimeInformationTable}
+        <h4>Spark Properties</h4> {sparkPropertiesTable}
+        <h4>System Properties</h4> {systemPropertiesTable}
+        <h4>Classpath Entries</h4> {classpathEntriesTable}
+      </span>
+
+    UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
+  }
+
+  private def propertyHeader = Seq("Name", "Value")
+  private def classPathHeaders = Seq("Resource", "Source")
+  private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+  private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
new file mode 100644
index 0000000..03b46e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
+
+private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new EnvironmentListener
+
+  attachPage(new EnvironmentPage(this))
+  parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentTab
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+  var jvmInformation = Seq[(String, String)]()
+  var sparkProperties = Seq[(String, String)]()
+  var systemProperties = Seq[(String, String)]()
+  var classpathEntries = Seq[(String, String)]()
+
+  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+    synchronized {
+      val environmentDetails = environmentUpdate.environmentDetails
+      jvmInformation = environmentDetails("JVM Information")
+      sparkProperties = environmentDetails("Spark Properties")
+      systemProperties = environmentDetails("System Properties")
+      classpathEntries = environmentDetails("Classpath Entries")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
deleted file mode 100644
index 33df971..0000000
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.env
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.scheduler._
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Environment
-
-private[ui] class EnvironmentUI(parent: SparkUI) {
-  private val basePath = parent.basePath
-  private var _listener: Option[EnvironmentListener] = None
-
-  private def appName = parent.appName
-
-  lazy val listener = _listener.get
-
-  def start() {
-    _listener = Some(new EnvironmentListener)
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/environment",
-      (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
-  )
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val runtimeInformationTable = UIUtils.listingTable(
-      propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
-    val sparkPropertiesTable = UIUtils.listingTable(
-      propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
-    val systemPropertiesTable = UIUtils.listingTable(
-      propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
-    val classpathEntriesTable = UIUtils.listingTable(
-      classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
-    val content =
-      <span>
-        <h4>Runtime Information</h4> {runtimeInformationTable}
-        <h4>Spark Properties</h4> {sparkPropertiesTable}
-        <h4>System Properties</h4> {systemPropertiesTable}
-        <h4>Classpath Entries</h4> {classpathEntriesTable}
-      </span>
-
-    UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
-  }
-
-  private def propertyHeader = Seq("Name", "Value")
-  private def classPathHeaders = Seq("Resource", "Source")
-  private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-  private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
-  private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the EnvironmentUI
- */
-private[ui] class EnvironmentListener extends SparkListener {
-  var jvmInformation = Seq[(String, String)]()
-  var sparkProperties = Seq[(String, String)]()
-  var systemProperties = Seq[(String, String)]()
-  var classpathEntries = Seq[(String, String)]()
-
-  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
-    synchronized {
-      val environmentDetails = environmentUpdate.environmentDetails
-      jvmInformation = environmentDetails("JVM Information")
-      sparkProperties = environmentDetails("Spark Properties")
-      systemProperties = environmentDetails("System Properties")
-      classpathEntries = environmentDetails("Classpath Entries")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
new file mode 100644
index 0000000..c1e69f6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val storageStatusList = listener.storageStatusList
+    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
+    val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
+    val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
+    val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
+
+    val content =
+      <div class="row-fluid">
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>Memory:</strong>
+              {Utils.bytesToString(memUsed)} Used
+              ({Utils.bytesToString(maxMem)} Total) </li>
+            <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
+          </ul>
+        </div>
+      </div>
+      <div class = "row">
+        <div class="span12">
+          {execTable}
+        </div>
+      </div>;
+
+    UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
+      parent.headerTabs, parent)
+  }
+
+  /** Header fields for the executors table */
+  private def execHeader = Seq(
+    "Executor ID",
+    "Address",
+    "RDD Blocks",
+    "Memory Used",
+    "Disk Used",
+    "Active Tasks",
+    "Failed Tasks",
+    "Complete Tasks",
+    "Total Tasks",
+    "Task Time",
+    "Shuffle Read",
+    "Shuffle Write")
+
+  /** Render an HTML row representing an executor */
+  private def execRow(values: Map[String, String]): Seq[Node] = {
+    val maximumMemory = values("Maximum Memory")
+    val memoryUsed = values("Memory Used")
+    val diskUsed = values("Disk Used")
+    <tr>
+      <td>{values("Executor ID")}</td>
+      <td>{values("Address")}</td>
+      <td>{values("RDD Blocks")}</td>
+      <td sorttable_customkey={memoryUsed}>
+        {Utils.bytesToString(memoryUsed.toLong)} /
+        {Utils.bytesToString(maximumMemory.toLong)}
+      </td>
+      <td sorttable_customkey={diskUsed}>
+        {Utils.bytesToString(diskUsed.toLong)}
+      </td>
+      <td>{values("Active Tasks")}</td>
+      <td>{values("Failed Tasks")}</td>
+      <td>{values("Complete Tasks")}</td>
+      <td>{values("Total Tasks")}</td>
+      <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+      <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
+      <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
+    </tr>
+  }
+
+  /** Represent an executor's info as a map given a storage status index */
+  private def getExecInfo(statusId: Int): Map[String, String] = {
+    val status = listener.storageStatusList(statusId)
+    val execId = status.blockManagerId.executorId
+    val hostPort = status.blockManagerId.hostPort
+    val rddBlocks = status.blocks.size
+    val memUsed = status.memUsed()
+    val maxMem = status.maxMem
+    val diskUsed = status.diskUsed()
+    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
+    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
+    val totalTasks = activeTasks + failedTasks + completedTasks
+    val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
+
+    // Also include fields not in the header
+    val execFields = execHeader ++ Seq("Maximum Memory")
+
+    val execValues = Seq(
+      execId,
+      hostPort,
+      rddBlocks,
+      memUsed,
+      diskUsed,
+      activeTasks,
+      failedTasks,
+      completedTasks,
+      totalTasks,
+      totalDuration,
+      totalShuffleRead,
+      totalShuffleWrite,
+      maxMem
+    ).map(_.toString)
+
+    execFields.zip(execValues).toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
new file mode 100644
index 0000000..5678bf3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val listener = new ExecutorsListener(parent.storageStatusListener)
+
+  attachPage(new ExecutorsPage(this))
+  parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the ExecutorsTab
+ */
+private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+  extends SparkListener {
+
+  val executorToTasksActive = HashMap[String, Int]()
+  val executorToTasksComplete = HashMap[String, Int]()
+  val executorToTasksFailed = HashMap[String, Int]()
+  val executorToDuration = HashMap[String, Long]()
+  val executorToShuffleRead = HashMap[String, Long]()
+  val executorToShuffleWrite = HashMap[String, Long]()
+
+  def storageStatusList = storageStatusListener.storageStatusList
+
+  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+    val eid = formatExecutorId(taskStart.taskInfo.executorId)
+    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    val info = taskEnd.taskInfo
+    if (info != null) {
+      val eid = formatExecutorId(info.executorId)
+      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
+      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+      taskEnd.reason match {
+        case e: ExceptionFailure =>
+          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+        case _ =>
+          executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+      }
+
+      // Update shuffle read/write
+      val metrics = taskEnd.taskMetrics
+      if (metrics != null) {
+        metrics.shuffleReadMetrics.foreach { shuffleRead =>
+          executorToShuffleRead(eid) =
+            executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
+        }
+        metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+          executorToShuffleWrite(eid) =
+            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+        }
+      }
+    }
+  }
+
+  // This addresses executor ID inconsistencies in the local mode
+  private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
deleted file mode 100644
index 77a38a1..0000000
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.exec
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable.HashMap
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Executors
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.Utils
-
-private[ui] class ExecutorsUI(parent: SparkUI) {
-  private val basePath = parent.basePath
-  private var _listener: Option[ExecutorsListener] = None
-
-  private def appName = parent.appName
-
-  lazy val listener = _listener.get
-
-  def start() {
-    _listener = Some(new ExecutorsListener(parent.storageStatusListener))
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/executors",
-      (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
-  )
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val storageStatusList = listener.storageStatusList
-    val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
-    val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
-    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
-    val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
-    val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
-    val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
-
-    val content =
-      <div class="row-fluid">
-        <div class="span12">
-          <ul class="unstyled">
-            <li><strong>Memory:</strong>
-              {Utils.bytesToString(memUsed)} Used
-              ({Utils.bytesToString(maxMem)} Total) </li>
-            <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
-          </ul>
-        </div>
-      </div>
-      <div class = "row">
-        <div class="span12">
-          {execTable}
-        </div>
-      </div>;
-
-    UIUtils.headerSparkPage(
-      content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
-  }
-
-  /** Header fields for the executors table */
-  private def execHeader = Seq(
-    "Executor ID",
-    "Address",
-    "RDD Blocks",
-    "Memory Used",
-    "Disk Used",
-    "Active Tasks",
-    "Failed Tasks",
-    "Complete Tasks",
-    "Total Tasks",
-    "Task Time",
-    "Shuffle Read",
-    "Shuffle Write")
-
-  /** Render an HTML row representing an executor */
-  private def execRow(values: Map[String, String]): Seq[Node] = {
-    val maximumMemory = values("Maximum Memory")
-    val memoryUsed = values("Memory Used")
-    val diskUsed = values("Disk Used")
-    <tr>
-      <td>{values("Executor ID")}</td>
-      <td>{values("Address")}</td>
-      <td>{values("RDD Blocks")}</td>
-      <td sorttable_customkey={memoryUsed}>
-        {Utils.bytesToString(memoryUsed.toLong)} /
-        {Utils.bytesToString(maximumMemory.toLong)}
-      </td>
-      <td sorttable_customkey={diskUsed}>
-        {Utils.bytesToString(diskUsed.toLong)}
-      </td>
-      <td>{values("Active Tasks")}</td>
-      <td>{values("Failed Tasks")}</td>
-      <td>{values("Complete Tasks")}</td>
-      <td>{values("Total Tasks")}</td>
-      <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
-      <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
-      <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
-    </tr>
-  }
-
-  /** Represent an executor's info as a map given a storage status index */
-  private def getExecInfo(statusId: Int): Map[String, String] = {
-    val status = listener.storageStatusList(statusId)
-    val execId = status.blockManagerId.executorId
-    val hostPort = status.blockManagerId.hostPort
-    val rddBlocks = status.blocks.size
-    val memUsed = status.memUsed()
-    val maxMem = status.maxMem
-    val diskUsed = status.diskUsed()
-    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
-    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
-    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
-    val totalTasks = activeTasks + failedTasks + completedTasks
-    val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
-    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
-    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
-
-    // Also include fields not in the header
-    val execFields = execHeader ++ Seq("Maximum Memory")
-
-    val execValues = Seq(
-      execId,
-      hostPort,
-      rddBlocks,
-      memUsed,
-      diskUsed,
-      activeTasks,
-      failedTasks,
-      completedTasks,
-      totalTasks,
-      totalDuration,
-      totalShuffleRead,
-      totalShuffleWrite,
-      maxMem
-    ).map(_.toString)
-
-    execFields.zip(execValues).toMap
-  }
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the ExecutorsUI
- */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
-  val executorToTasksActive = HashMap[String, Int]()
-  val executorToTasksComplete = HashMap[String, Int]()
-  val executorToTasksFailed = HashMap[String, Int]()
-  val executorToDuration = HashMap[String, Long]()
-  val executorToShuffleRead = HashMap[String, Long]()
-  val executorToShuffleWrite = HashMap[String, Long]()
-
-  def storageStatusList = storageStatusListener.storageStatusList
-
-  override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val eid = formatExecutorId(taskStart.taskInfo.executorId)
-    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val info = taskEnd.taskInfo
-    if (info != null) {
-      val eid = formatExecutorId(info.executorId)
-      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
-      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
-      taskEnd.reason match {
-        case e: ExceptionFailure =>
-          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
-        case _ =>
-          executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
-      }
-
-      // Update shuffle read/write
-      val metrics = taskEnd.taskMetrics
-      if (metrics != null) {
-        metrics.shuffleReadMetrics.foreach { shuffleRead =>
-          executorToShuffleRead(eid) =
-            executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
-        }
-        metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
-          executorToShuffleWrite(eid) =
-            executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
-        }
-      }
-    }
-  }
-
-  // This addresses executor ID inconsistencies in the local mode
-  private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 73861ae..c83e196 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
 import scala.collection.mutable
 import scala.xml.Node
 
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
 /** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
-  private lazy val listener = parent.listener
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+  private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {
@@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
           <tr>
             <td>{k}</td>
             <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
-            <td>{parent.formatDuration(v.taskTime)}</td>
+            <td>{UIUtils.formatDuration(v.taskTime)}</td>
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
deleted file mode 100644
index 8619a31..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.spark.scheduler.Schedulable
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-
-/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class IndexPage(parent: JobProgressUI) {
-  private val basePath = parent.basePath
-  private val live = parent.live
-  private val sc = parent.sc
-  private lazy val listener = parent.listener
-  private lazy val isFairScheduler = parent.isFairScheduler
-
-  private def appName = parent.appName
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val activeStages = listener.activeStages.values.toSeq
-      val completedStages = listener.completedStages.reverse.toSeq
-      val failedStages = listener.failedStages.reverse.toSeq
-      val now = System.currentTimeMillis()
-
-      val activeStagesTable =
-        new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
-      val completedStagesTable =
-        new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
-      val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
-
-      // For now, pool information is only accessible in live UIs
-      val pools = if (live) sc.getAllPools else Seq[Schedulable]()
-      val poolTable = new PoolTable(pools, parent)
-
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            {if (live) {
-              // Total duration is not meaningful unless the UI is live
-              <li>
-                <strong>Total Duration: </strong>
-                {parent.formatDuration(now - sc.startTime)}
-              </li>
-            }}
-            <li>
-              <strong>Scheduling Mode: </strong>
-              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
-            </li>
-            <li>
-              <a href="#active"><strong>Active Stages:</strong></a>
-              {activeStages.size}
-            </li>
-            <li>
-              <a href="#completed"><strong>Completed Stages:</strong></a>
-              {completedStages.size}
-            </li>
-             <li>
-             <a href="#failed"><strong>Failed Stages:</strong></a>
-              {failedStages.size}
-            </li>
-          </ul>
-        </div>
-
-      val content = summary ++
-        {if (live && isFairScheduler) {
-          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
-        } else {
-          Seq[Node]()
-        }} ++
-        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
-        activeStagesTable.toNodeSeq ++
-        <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
-        completedStagesTable.toNodeSeq ++
-        <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
-        failedStagesTable.toNodeSeq
-
-      UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5167e20..0db4afa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
 
   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
     synchronized {
-      val schedulingModeName =
-        environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode")
-      schedulingMode = schedulingModeName match {
-        case Some(name) => Some(SchedulingMode.withName(name))
-        case None => None
-      }
+      schedulingMode = environmentUpdate
+        .environmentDetails("Spark Properties").toMap
+        .get("spark.scheduler.mode")
+        .map(SchedulingMode.withName)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
new file mode 100644
index 0000000..34ff2ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+
+/** Page showing list of all ongoing and recently finished stages and pools */
+private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val live = parent.live
+  private val sc = parent.sc
+  private val listener = parent.listener
+  private lazy val isFairScheduler = parent.isFairScheduler
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val activeStages = listener.activeStages.values.toSeq
+      val completedStages = listener.completedStages.reverse.toSeq
+      val failedStages = listener.failedStages.reverse.toSeq
+      val now = System.currentTimeMillis
+
+      val activeStagesTable =
+        new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
+      val completedStagesTable =
+        new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+      val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+
+      // For now, pool information is only accessible in live UIs
+      val pools = if (live) sc.getAllPools else Seq[Schedulable]()
+      val poolTable = new PoolTable(pools, parent)
+
+      val summary: NodeSeq =
+        <div>
+          <ul class="unstyled">
+            {if (live) {
+              // Total duration is not meaningful unless the UI is live
+              <li>
+                <strong>Total Duration: </strong>
+                {UIUtils.formatDuration(now - sc.startTime)}
+              </li>
+            }}
+            <li>
+              <strong>Scheduling Mode: </strong>
+              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+            </li>
+            <li>
+              <a href="#active"><strong>Active Stages:</strong></a>
+              {activeStages.size}
+            </li>
+            <li>
+              <a href="#completed"><strong>Completed Stages:</strong></a>
+              {completedStages.size}
+            </li>
+             <li>
+             <a href="#failed"><strong>Failed Stages:</strong></a>
+              {failedStages.size}
+            </li>
+          </ul>
+        </div>
+
+      val content = summary ++
+        {if (live && isFairScheduler) {
+          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
+        } else {
+          Seq[Node]()
+        }} ++
+        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+        activeStagesTable.toNodeSeq ++
+        <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+        completedStagesTable.toNodeSeq ++
+        <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+        failedStagesTable.toNodeSeq
+
+      UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
new file mode 100644
index 0000000..3308c8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
+  val appName = parent.appName
+  val basePath = parent.basePath
+  val live = parent.live
+  val sc = parent.sc
+  val conf = if (live) sc.conf else new SparkConf
+  val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
+  val listener = new JobProgressListener(conf)
+
+  attachPage(new JobProgressPage(this))
+  attachPage(new StagePage(this))
+  attachPage(new PoolPage(this))
+  parent.registerListener(listener)
+
+  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+
+  def handleKillRequest(request: HttpServletRequest) =  {
+    if (killEnabled) {
+      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+        sc.cancelStage(stageId)
+      }
+      // Do a quick pause here to give Spark time to kill the stage so it shows up as
+      // killed after the refresh. Note that this will block the serving thread so the
+      // time should be limited in duration.
+      Thread.sleep(100)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
deleted file mode 100644
index 30e3f35..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressUI(parent: SparkUI) {
-  val basePath = parent.basePath
-  val live = parent.live
-  val sc = parent.sc
-  val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true)
-
-  lazy val listener = _listener.get
-  lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
-  private val indexPage = new IndexPage(this)
-  private val stagePage = new StagePage(this)
-  private val poolPage = new PoolPage(this)
-  private var _listener: Option[JobProgressListener] = None
-
-  def appName = parent.appName
-
-  def start() {
-    val conf = if (live) sc.conf else new SparkConf
-    _listener = Some(new JobProgressListener(conf))
-  }
-
-  def formatDuration(ms: Long) = Utils.msDurationToString(ms)
-
-  private def handleKillRequest(request: HttpServletRequest) =  {
-    if (killEnabled) {
-      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
-      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
-      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
-        sc.cancelStage(stageId)
-      }
-      // Do a quick pause here to give Spark time to kill the stage so it shows up as
-      // killed after the refresh. Note that this will block the serving thread so the
-      // time should be limited in duration.
-      Thread.sleep(100)
-    }
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest),
-    createServletHandler("/stages/stage",
-      (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
-    createServletHandler("/stages/pool",
-      (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
-    createServletHandler("/stages",
-      (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
-  )
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 3638e60..fd83d37 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -22,17 +22,15 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.scheduler.{Schedulable, StageInfo}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 
 /** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressUI) {
+private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
   private val live = parent.live
   private val sc = parent.sc
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -52,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressUI) {
         <h4>Summary </h4> ++ poolTable.toNodeSeq ++
         <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
 
-      UIUtils.headerSparkPage(
-        content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages)
+      UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
+        parent.headerTabs, parent)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index c5c8d86..f4b68f2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
   private val basePath = parent.basePath
-  private val poolToActiveStages = listener.poolToActiveStages
-  private lazy val listener = parent.listener
+  private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {
@@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
         <th>SchedulingMode</th>
       </thead>
       <tbody>
-        {rows.map(r => makeRow(r, poolToActiveStages))}
+        {rows.map(r => makeRow(r, listener.poolToActiveStages))}
       </tbody>
     </table>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index b6c3e3c..4bce472 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -22,17 +22,14 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressUI) {
+private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-  private lazy val sc = parent.sc
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -44,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
             <h4>Summary Metrics</h4> No tasks have started yet
             <h4>Tasks</h4> No tasks have started yet
           </div>
-        return UIUtils.headerSparkPage(
-          content, basePath, appName, "Details for Stage %s".format(stageId), Stages)
+        return UIUtils.headerSparkPage(content, basePath, appName,
+          "Details for Stage %s".format(stageId), parent.headerTabs, parent)
       }
 
       val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
@@ -60,7 +57,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
       val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
 
       var activeTime = 0L
-      val now = System.currentTimeMillis()
+      val now = System.currentTimeMillis
       val tasksActive = listener.stageIdToTasksActive(stageId).values
       tasksActive.foreach(activeTime += _.timeRunning(now))
 
@@ -70,7 +67,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           <ul class="unstyled">
             <li>
               <strong>Total task time across all tasks: </strong>
-              {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+              {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
             {if (hasShuffleRead)
               <li>
@@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val serializationQuantiles =
             "Result serialization time" +: Distribution(serializationTimes).
-              get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+              get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
 
           val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.executorRunTime.toDouble
           }
           val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
-            .map(ms => parent.formatDuration(ms.toLong))
+            .map(ms => UIUtils.formatDuration(ms.toLong))
 
           val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
             if (info.gettingResultTime > 0) {
@@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val gettingResultQuantiles = "Time spent fetching task results" +:
             Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
-              parent.formatDuration(millis.toLong)
+              UIUtils.formatDuration(millis.toLong)
             }
           // The scheduler delay includes the network delay to send the task to the worker
           // machine and to send back the result (but not the time to fetch the task result,
@@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
           }
           val schedulerDelayQuantiles = "Scheduler delay" +:
             Distribution(schedulerDelays).get.getQuantiles().map { millis =>
-              parent.formatDuration(millis.toLong)
+              UIUtils.formatDuration(millis.toLong)
             }
 
           def getQuantileCols(data: Seq[Double]) =
@@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
         <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
         <h4>Tasks</h4> ++ taskTable
 
-      UIUtils.headerSparkPage(
-        content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+      UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+        parent.headerTabs, parent)
     }
   }
 
@@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
     taskData match { case TaskUIData(info, metrics, exception) =>
       val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
         else metrics.map(_.executorRunTime).getOrElse(1L)
-      val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
-        else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+      val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+        else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
       val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
       val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
 
@@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
 
       val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
       val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
-      val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
-        if (ms == 0) "" else parent.formatDuration(ms)
+      val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+        if (ms == 0) "" else UIUtils.formatDuration(ms)
       }.getOrElse("")
 
       val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
         <td>{info.status}</td>
         <td>{info.taskLocality}</td>
         <td>{info.host}</td>
-        <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
+        <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
         <td sorttable_customkey={duration.toString}>
           {formatDuration}
         </td>
         <td sorttable_customkey={gcTime.toString}>
-          {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+          {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
         </td>
         <td sorttable_customkey={serializationTime.toString}>
-          {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+          {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
         </td>
         {if (shuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index e419fae..8c5b1f5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap
 import scala.xml.Node
 
 import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished stages */
 private[ui] class StageTable(
-  stages: Seq[StageInfo],
-  parent: JobProgressUI,
-  killEnabled: Boolean = false) {
+    stages: Seq[StageInfo],
+    parent: JobProgressTab,
+    killEnabled: Boolean = false) {
 
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
+  private val listener = parent.listener
   private lazy val isFairScheduler = parent.isFairScheduler
 
   def toNodeSeq: Seq[Node] = {
@@ -89,25 +89,23 @@ private[ui] class StageTable(
         {s.name}
       </a>
 
-    val description = listener.stageIdToDescription.get(s.stageId)
+    listener.stageIdToDescription.get(s.stageId)
       .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
       .getOrElse(<div> {killLink}{nameLink}</div>)
-
-    return description
   }
 
   /** Render an HTML row that represents a stage */
   private def stageRow(s: StageInfo): Seq[Node] = {
     val poolName = listener.stageIdToPool.get(s.stageId)
     val submissionTime = s.submissionTime match {
-      case Some(t) => WebUI.formatDate(new Date(t))
+      case Some(t) => UIUtils.formatDate(new Date(t))
       case None => "Unknown"
     }
     val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
     val duration = s.submissionTime.map { t =>
       if (finishTime > t) finishTime - t else System.currentTimeMillis - t
     }
-    val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
     val startedTasks =
       listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
     val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
deleted file mode 100644
index 16996a2..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
-
-/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
-  val basePath = parent.basePath
-
-  private val indexPage = new IndexPage(this)
-  private val rddPage = new RDDPage(this)
-  private var _listener: Option[BlockManagerListener] = None
-
-  lazy val listener = _listener.get
-
-  def appName = parent.appName
-
-  def start() {
-    _listener = Some(new BlockManagerListener(parent.storageStatusListener))
-  }
-
-  def getHandlers = Seq[ServletContextHandler](
-    createServletHandler("/storage/rdd",
-      (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
-    createServletHandler("/storage",
-      (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
-  )
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the BlockManagerUI
- */
-private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
-  private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
-
-  def storageStatusList = storageStatusListener.storageStatusList
-
-  /** Filter RDD info to include only those with cached partitions */
-  def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
-
-  /** Update each RDD's info to reflect any updates to the RDD's storage status */
-  private def updateRDDInfo() {
-    val rddInfos = _rddInfoMap.values.toSeq
-    val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
-    updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
-  }
-
-  /**
-   * Assumes the storage status list is fully up-to-date. This implies the corresponding
-   * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
-   */
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val metrics = taskEnd.taskMetrics
-    if (metrics != null && metrics.updatedBlocks.isDefined) {
-      updateRDDInfo()
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
-    val rddInfo = stageSubmitted.stageInfo.rddInfo
-    _rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
-  }
-
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
-    // Remove all partitions that are no longer cached
-    _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
-  }
-
-  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
-    updateRDDInfo()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
deleted file mode 100644
index 4f6acc3..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.Utils
-
-/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
-  private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-
-    val rdds = listener.rddInfoList
-    val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
-    UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
-  }
-
-  /** Header fields for the RDD table */
-  private def rddHeader = Seq(
-    "RDD Name",
-    "Storage Level",
-    "Cached Partitions",
-    "Fraction Cached",
-    "Size in Memory",
-    "Size in Tachyon",
-    "Size on Disk")
-
-  /** Render an HTML row representing an RDD */
-  private def rddRow(rdd: RDDInfo): Seq[Node] = {
-    <tr>
-      <td>
-        <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
-          {rdd.name}
-        </a>
-      </td>
-      <td>{rdd.storageLevel.description}
-      </td>
-      <td>{rdd.numCachedPartitions}</td>
-      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
-      <td>{Utils.bytesToString(rdd.memSize)}</td>
-      <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
-      <td>{Utils.bytesToString(rdd.diskSize)}</td>
-    </tr>
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 75ee997..d07f1c9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
 
 /** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
+  private val appName = parent.appName
   private val basePath = parent.basePath
-  private lazy val listener = parent.listener
-
-  private def appName = parent.appName
+  private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     val rddId = request.getParameter("id").toInt
     val storageStatusList = listener.storageStatusList
     val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
       // Rather than crashing, render an "RDD Not Found" page
-      return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+      return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+        parent.headerTabs, parent)
     }
 
     // Worker table
@@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
         </div>
       </div>;
 
-    UIUtils.headerSparkPage(
-      content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+    UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+      parent.headerTabs, parent)
   }
 
   /** Header fields for the worker table */

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
new file mode 100644
index 0000000..b66edd9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+/** Page showing list of RDD's currently stored in the cluster */
+private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
+  private val appName = parent.appName
+  private val basePath = parent.basePath
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val rdds = listener.rddInfoList
+    val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
+    UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
+  }
+
+  /** Header fields for the RDD table */
+  private def rddHeader = Seq(
+    "RDD Name",
+    "Storage Level",
+    "Cached Partitions",
+    "Fraction Cached",
+    "Size in Memory",
+    "Size in Tachyon",
+    "Size on Disk")
+
+  /** Render an HTML row representing an RDD */
+  private def rddRow(rdd: RDDInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
+          {rdd.name}
+        </a>
+      </td>
+      <td>{rdd.storageLevel.description}
+      </td>
+      <td>{rdd.numCachedPartitions}</td>
+      <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
+      <td>{Utils.bytesToString(rdd.memSize)}</td>
+      <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
+      <td>{Utils.bytesToString(rdd.diskSize)}</td>
+    </tr>
+  }
+}