You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/12 08:34:05 UTC
[2/3] [SPARK-1386] Web UI for Spark Streaming
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a7cf04b..6a2d652 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,16 +17,115 @@
package org.apache.spark.ui
+import java.text.SimpleDateFormat
+import java.util.{Locale, Date}
+
import scala.xml.Node
+import org.apache.spark.Logging
/** Utility functions for generating XML pages with spark content. */
-private[spark] object UIUtils {
+private[spark] object UIUtils extends Logging {
+
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
+ def formatDate(date: Date): String = dateFormat.get.format(date)
+
+ def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ "%.1f h".format(hours)
+ }
+
+ /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
+ def formatDurationVerbose(ms: Long): String = {
+ try {
+ val second = 1000L
+ val minute = 60 * second
+ val hour = 60 * minute
+ val day = 24 * hour
+ val week = 7 * day
+ val year = 365 * day
+
+ def toString(num: Long, unit: String): String = {
+ if (num == 0) {
+ ""
+ } else if (num == 1) {
+ s"$num $unit"
+ } else {
+ s"$num ${unit}s"
+ }
+ }
+
+ val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
+ val secondString = toString((ms % minute) / second, "second")
+ val minuteString = toString((ms % hour) / minute, "minute")
+ val hourString = toString((ms % day) / hour, "hour")
+ val dayString = toString((ms % week) / day, "day")
+ val weekString = toString((ms % year) / week, "week")
+ val yearString = toString(ms / year, "year")
- import Page._
+ Seq(
+ second -> millisecondsString,
+ minute -> s"$secondString $millisecondsString",
+ hour -> s"$minuteString $secondString",
+ day -> s"$hourString $minuteString $secondString",
+ week -> s"$dayString $hourString $minuteString",
+ year -> s"$weekString $dayString $hourString"
+ ).foreach { case (durationLimit, durationString) =>
+ if (ms < durationLimit) {
+ // if time is less than the limit (upto year)
+ return durationString
+ }
+ }
+ // if time is more than a year
+ return s"$yearString $weekString $dayString"
+ } catch {
+ case e: Exception =>
+ logError("Error converting time to string", e)
+ // if there is some error, return blank string
+ return ""
+ }
+ }
+
+ /** Generate a human-readable string representing a number (e.g. 100 K) */
+ def formatNumber(records: Double): String = {
+ val trillion = 1e12
+ val billion = 1e9
+ val million = 1e6
+ val thousand = 1e3
+
+ val (value, unit) = {
+ if (records >= 2*trillion) {
+ (records / trillion, " T")
+ } else if (records >= 2*billion) {
+ (records / billion, " B")
+ } else if (records >= 2*million) {
+ (records / million, " M")
+ } else if (records >= 2*thousand) {
+ (records / thousand, " K")
+ } else {
+ (records, "")
+ }
+ }
+ "%.1f%s".formatLocal(Locale.US, value, unit)
+ }
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
- private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
- getOrElse("")
+ val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
@@ -36,26 +135,14 @@ private[spark] object UIUtils {
basePath: String,
appName: String,
title: String,
- page: Page.Value) : Seq[Node] = {
- val jobs = page match {
- case Stages =>
- <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
- }
- val storage = page match {
- case Storage =>
- <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
- }
- val environment = page match {
- case Environment =>
- <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
- }
- val executors = page match {
- case Executors =>
- <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+ tabs: Seq[WebUITab],
+ activeTab: WebUITab,
+ refreshInterval: Option[Int] = None): Seq[Node] = {
+
+ val header = tabs.map { tab =>
+ <li class={if (tab == activeTab) "active" else ""}>
+ <a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
+ </li>
}
<html>
@@ -74,16 +161,10 @@ private[spark] object UIUtils {
<a href={prependBaseUri(basePath, "/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
- <ul class="nav">
- {jobs}
- {storage}
- {environment}
- {executors}
- </ul>
+ <ul class="nav">{header}</ul>
<p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
</div>
</div>
-
<div class="container-fluid">
<div class="row-fluid">
<div class="span12">
@@ -129,21 +210,36 @@ private[spark] object UIUtils {
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
def listingTable[T](
headers: Seq[String],
- makeRow: T => Seq[Node],
- rows: Seq[T],
+ generateDataRow: T => Seq[Node],
+ data: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
- val colWidth = 100.toDouble / headers.size
- val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
var tableClass = "table table-bordered table-striped table-condensed sortable"
if (fixedWidth) {
tableClass += " table-fixed"
}
-
+ val colWidth = 100.toDouble / headers.size
+ val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+ val headerRow: Seq[Node] = {
+ // if none of the headers have "\n" in them
+ if (headers.forall(!_.contains("\n"))) {
+ // represent header as simple text
+ headers.map(h => <th width={colWidthAttr}>{h}</th>)
+ } else {
+ // represent header text as list while respecting "\n"
+ headers.map { case h =>
+ <th width={colWidthAttr}>
+ <ul class ="unstyled">
+ { h.split("\n").map { case t => <li> {t} </li> } }
+ </ul>
+ </th>
+ }
+ }
+ }
<table class={tableClass}>
- <thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
+ <thead>{headerRow}</thead>
<tbody>
- {rows.map(r => makeRow(r))}
+ {data.map(r => generateDataRow(r))}
</tbody>
</table>
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 2cc7582..b08f308 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -17,53 +17,134 @@
package org.apache.spark.ui
-import java.text.SimpleDateFormat
-import java.util.Date
+import javax.servlet.http.HttpServletRequest
-private[spark] abstract class WebUI(name: String) {
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.Node
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.json4s.JsonAST.{JNothing, JValue}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * The top level component of the UI hierarchy that contains the server.
+ *
+ * Each WebUI represents a collection of tabs, each of which in turn represents a collection of
+ * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
+ */
+private[spark] abstract class WebUI(
+ securityManager: SecurityManager,
+ port: Int,
+ conf: SparkConf,
+ basePath: String = "")
+ extends Logging {
+
+ protected val tabs = ArrayBuffer[WebUITab]()
+ protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
+ protected val localHostName = Utils.localHostName()
+ protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+ private val className = Utils.getFormattedClassName(this)
+
+ def getTabs: Seq[WebUITab] = tabs.toSeq
+ def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+
+ /** Attach a tab to this UI, along with all of its attached pages. */
+ def attachTab(tab: WebUITab) {
+ tab.pages.foreach(attachPage)
+ tabs += tab
+ }
+
+ /** Attach a page to this UI. */
+ def attachPage(page: WebUIPage) {
+ val pagePath = "/" + page.prefix
+ attachHandler(createServletHandler(pagePath,
+ (request: HttpServletRequest) => page.render(request), securityManager, basePath))
+ attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+ }
+
+ /** Attach a handler to this UI. */
+ def attachHandler(handler: ServletContextHandler) {
+ handlers += handler
+ serverInfo.foreach { info =>
+ info.rootHandler.addHandler(handler)
+ if (!handler.isStarted) {
+ handler.start()
+ }
+ }
+ }
- /**
- * Bind to the HTTP server behind this web interface.
- * Overridden implementation should set serverInfo.
- */
- def bind() { }
+ /** Detach a handler from this UI. */
+ def detachHandler(handler: ServletContextHandler) {
+ handlers -= handler
+ serverInfo.foreach { info =>
+ info.rootHandler.removeHandler(handler)
+ if (handler.isStarted) {
+ handler.stop()
+ }
+ }
+ }
+
+ /** Initialize all components of the server. */
+ def initialize()
+
+ /** Bind to the HTTP server behind this web interface. */
+ def bind() {
+ assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
+ try {
+ serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+ logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to bind %s".format(className), e)
+ System.exit(1)
+ }
+ }
/** Return the actual port to which this server is bound. Only valid after bind(). */
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
/** Stop the server behind this web interface. Only valid after bind(). */
def stop() {
- assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+ assert(serverInfo.isDefined,
+ "Attempted to stop %s before binding to a server!".format(className))
serverInfo.get.server.stop()
}
}
+
/**
- * Utilities used throughout the web UI.
+ * A tab that represents a collection of pages.
+ * The prefix is appended to the parent address to form a full path, and must not contain slashes.
*/
-private[spark] object WebUI {
- // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
- private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
+ val pages = ArrayBuffer[WebUIPage]()
+ val name = prefix.capitalize
+
+ /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
+ def attachPage(page: WebUIPage) {
+ page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
+ pages += page
}
- def formatDate(date: Date): String = dateFormat.get.format(date)
+ /** Get a list of header tabs from the parent UI. */
+ def headerTabs: Seq[WebUITab] = parent.getTabs
+}
- def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
- def formatDuration(milliseconds: Long): String = {
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return "%.0f s".format(seconds)
- }
- val minutes = seconds / 60
- if (minutes < 10) {
- return "%.1f min".format(minutes)
- } else if (minutes < 60) {
- return "%.0f min".format(minutes)
- }
- val hours = minutes / 60
- "%.1f h".format(hours)
- }
+/**
+ * A page that represents the leaf node in the UI hierarchy.
+ *
+ * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
+ * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
+ * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
+ * to form a relative path. The prefix must not contain slashes.
+ */
+private[spark] abstract class WebUIPage(var prefix: String) {
+ def render(request: HttpServletRequest): Seq[Node]
+ def renderJson(request: HttpServletRequest): JValue = JNothing
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
new file mode 100644
index 0000000..b347eb1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private val listener = parent.listener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val runtimeInformationTable = UIUtils.listingTable(
+ propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
+ val sparkPropertiesTable = UIUtils.listingTable(
+ propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
+ val systemPropertiesTable = UIUtils.listingTable(
+ propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
+ val classpathEntriesTable = UIUtils.listingTable(
+ classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
+ val content =
+ <span>
+ <h4>Runtime Information</h4> {runtimeInformationTable}
+ <h4>Spark Properties</h4> {sparkPropertiesTable}
+ <h4>System Properties</h4> {systemPropertiesTable}
+ <h4>Classpath Entries</h4> {classpathEntriesTable}
+ </span>
+
+ UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
+ }
+
+ private def propertyHeader = Seq("Name", "Value")
+ private def classPathHeaders = Seq("Resource", "Source")
+ private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
new file mode 100644
index 0000000..03b46e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
+
+private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new EnvironmentListener
+
+ attachPage(new EnvironmentPage(this))
+ parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentTab
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+ var jvmInformation = Seq[(String, String)]()
+ var sparkProperties = Seq[(String, String)]()
+ var systemProperties = Seq[(String, String)]()
+ var classpathEntries = Seq[(String, String)]()
+
+ override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+ synchronized {
+ val environmentDetails = environmentUpdate.environmentDetails
+ jvmInformation = environmentDetails("JVM Information")
+ sparkProperties = environmentDetails("Spark Properties")
+ systemProperties = environmentDetails("System Properties")
+ classpathEntries = environmentDetails("Classpath Entries")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
deleted file mode 100644
index 33df971..0000000
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.env
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.scheduler._
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Environment
-
-private[ui] class EnvironmentUI(parent: SparkUI) {
- private val basePath = parent.basePath
- private var _listener: Option[EnvironmentListener] = None
-
- private def appName = parent.appName
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new EnvironmentListener)
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/environment",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
-
- def render(request: HttpServletRequest): Seq[Node] = {
- val runtimeInformationTable = UIUtils.listingTable(
- propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
- val sparkPropertiesTable = UIUtils.listingTable(
- propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true)
- val systemPropertiesTable = UIUtils.listingTable(
- propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
- val classpathEntriesTable = UIUtils.listingTable(
- classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
- val content =
- <span>
- <h4>Runtime Information</h4> {runtimeInformationTable}
- <h4>Spark Properties</h4> {sparkPropertiesTable}
- <h4>System Properties</h4> {systemPropertiesTable}
- <h4>Classpath Entries</h4> {classpathEntriesTable}
- </span>
-
- UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
- }
-
- private def propertyHeader = Seq("Name", "Value")
- private def classPathHeaders = Seq("Resource", "Source")
- private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
- private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
- private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the EnvironmentUI
- */
-private[ui] class EnvironmentListener extends SparkListener {
- var jvmInformation = Seq[(String, String)]()
- var sparkProperties = Seq[(String, String)]()
- var systemProperties = Seq[(String, String)]()
- var classpathEntries = Seq[(String, String)]()
-
- override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
- synchronized {
- val environmentDetails = environmentUpdate.environmentDetails
- jvmInformation = environmentDetails("JVM Information")
- sparkProperties = environmentDetails("Spark Properties")
- systemProperties = environmentDetails("System Properties")
- classpathEntries = environmentDetails("Classpath Entries")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
new file mode 100644
index 0000000..c1e69f6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private val listener = parent.listener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val storageStatusList = listener.storageStatusList
+ val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+ val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
+ val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
+ val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
+ val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
+
+ val content =
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Memory:</strong>
+ {Utils.bytesToString(memUsed)} Used
+ ({Utils.bytesToString(maxMem)} Total) </li>
+ <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
+ </ul>
+ </div>
+ </div>
+ <div class = "row">
+ <div class="span12">
+ {execTable}
+ </div>
+ </div>;
+
+ UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
+ parent.headerTabs, parent)
+ }
+
+ /** Header fields for the executors table */
+ private def execHeader = Seq(
+ "Executor ID",
+ "Address",
+ "RDD Blocks",
+ "Memory Used",
+ "Disk Used",
+ "Active Tasks",
+ "Failed Tasks",
+ "Complete Tasks",
+ "Total Tasks",
+ "Task Time",
+ "Shuffle Read",
+ "Shuffle Write")
+
+ /** Render an HTML row representing an executor */
+ private def execRow(values: Map[String, String]): Seq[Node] = {
+ val maximumMemory = values("Maximum Memory")
+ val memoryUsed = values("Memory Used")
+ val diskUsed = values("Disk Used")
+ <tr>
+ <td>{values("Executor ID")}</td>
+ <td>{values("Address")}</td>
+ <td>{values("RDD Blocks")}</td>
+ <td sorttable_customkey={memoryUsed}>
+ {Utils.bytesToString(memoryUsed.toLong)} /
+ {Utils.bytesToString(maximumMemory.toLong)}
+ </td>
+ <td sorttable_customkey={diskUsed}>
+ {Utils.bytesToString(diskUsed.toLong)}
+ </td>
+ <td>{values("Active Tasks")}</td>
+ <td>{values("Failed Tasks")}</td>
+ <td>{values("Complete Tasks")}</td>
+ <td>{values("Total Tasks")}</td>
+ <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+ <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
+ <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
+ </tr>
+ }
+
+ /** Represent an executor's info as a map given a storage status index */
+ private def getExecInfo(statusId: Int): Map[String, String] = {
+ val status = listener.storageStatusList(statusId)
+ val execId = status.blockManagerId.executorId
+ val hostPort = status.blockManagerId.hostPort
+ val rddBlocks = status.blocks.size
+ val memUsed = status.memUsed()
+ val maxMem = status.maxMem
+ val diskUsed = status.diskUsed()
+ val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
+ val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+ val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
+ val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
+
+ // Also include fields not in the header
+ val execFields = execHeader ++ Seq("Maximum Memory")
+
+ val execValues = Seq(
+ execId,
+ hostPort,
+ rddBlocks,
+ memUsed,
+ diskUsed,
+ activeTasks,
+ failedTasks,
+ completedTasks,
+ totalTasks,
+ totalDuration,
+ totalShuffleRead,
+ totalShuffleWrite,
+ maxMem
+ ).map(_.toString)
+
+ execFields.zip(execValues).toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
new file mode 100644
index 0000000..5678bf3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new ExecutorsListener(parent.storageStatusListener)
+
+ attachPage(new ExecutorsPage(this))
+ parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the ExecutorsTab
+ */
+private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+ extends SparkListener {
+
+ val executorToTasksActive = HashMap[String, Int]()
+ val executorToTasksComplete = HashMap[String, Int]()
+ val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
+
+ def storageStatusList = storageStatusListener.storageStatusList
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ val eid = formatExecutorId(taskStart.taskInfo.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ val info = taskEnd.taskInfo
+ if (info != null) {
+ val eid = formatExecutorId(info.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
+ executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ case _ =>
+ executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ }
+
+ // Update shuffle read/write
+ val metrics = taskEnd.taskMetrics
+ if (metrics != null) {
+ metrics.shuffleReadMetrics.foreach { shuffleRead =>
+ executorToShuffleRead(eid) =
+ executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
+ }
+ metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ executorToShuffleWrite(eid) =
+ executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+ }
+ }
+ }
+ }
+
+ // This addresses executor ID inconsistencies in the local mode
+ private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
deleted file mode 100644
index 77a38a1..0000000
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.exec
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable.HashMap
-import scala.xml.Node
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Executors
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.Utils
-
-private[ui] class ExecutorsUI(parent: SparkUI) {
- private val basePath = parent.basePath
- private var _listener: Option[ExecutorsListener] = None
-
- private def appName = parent.appName
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new ExecutorsListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/executors",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
-
- def render(request: HttpServletRequest): Seq[Node] = {
- val storageStatusList = listener.storageStatusList
- val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
- val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
- val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
- val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
- val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
-
- val content =
- <div class="row-fluid">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>Memory:</strong>
- {Utils.bytesToString(memUsed)} Used
- ({Utils.bytesToString(maxMem)} Total) </li>
- <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
- </ul>
- </div>
- </div>
- <div class = "row">
- <div class="span12">
- {execTable}
- </div>
- </div>;
-
- UIUtils.headerSparkPage(
- content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
- }
-
- /** Header fields for the executors table */
- private def execHeader = Seq(
- "Executor ID",
- "Address",
- "RDD Blocks",
- "Memory Used",
- "Disk Used",
- "Active Tasks",
- "Failed Tasks",
- "Complete Tasks",
- "Total Tasks",
- "Task Time",
- "Shuffle Read",
- "Shuffle Write")
-
- /** Render an HTML row representing an executor */
- private def execRow(values: Map[String, String]): Seq[Node] = {
- val maximumMemory = values("Maximum Memory")
- val memoryUsed = values("Memory Used")
- val diskUsed = values("Disk Used")
- <tr>
- <td>{values("Executor ID")}</td>
- <td>{values("Address")}</td>
- <td>{values("RDD Blocks")}</td>
- <td sorttable_customkey={memoryUsed}>
- {Utils.bytesToString(memoryUsed.toLong)} /
- {Utils.bytesToString(maximumMemory.toLong)}
- </td>
- <td sorttable_customkey={diskUsed}>
- {Utils.bytesToString(diskUsed.toLong)}
- </td>
- <td>{values("Active Tasks")}</td>
- <td>{values("Failed Tasks")}</td>
- <td>{values("Complete Tasks")}</td>
- <td>{values("Total Tasks")}</td>
- <td>{Utils.msDurationToString(values("Task Time").toLong)}</td>
- <td>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
- <td>{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
- </tr>
- }
-
- /** Represent an executor's info as a map given a storage status index */
- private def getExecInfo(statusId: Int): Map[String, String] = {
- val status = listener.storageStatusList(statusId)
- val execId = status.blockManagerId.executorId
- val hostPort = status.blockManagerId.hostPort
- val rddBlocks = status.blocks.size
- val memUsed = status.memUsed()
- val maxMem = status.maxMem
- val diskUsed = status.diskUsed()
- val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
- val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
- val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
- val totalTasks = activeTasks + failedTasks + completedTasks
- val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
- val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
- val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
-
- // Also include fields not in the header
- val execFields = execHeader ++ Seq("Maximum Memory")
-
- val execValues = Seq(
- execId,
- hostPort,
- rddBlocks,
- memUsed,
- diskUsed,
- activeTasks,
- failedTasks,
- completedTasks,
- totalTasks,
- totalDuration,
- totalShuffleRead,
- totalShuffleWrite,
- maxMem
- ).map(_.toString)
-
- execFields.zip(execValues).toMap
- }
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the ExecutorsUI
- */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
- val executorToTasksActive = HashMap[String, Int]()
- val executorToTasksComplete = HashMap[String, Int]()
- val executorToTasksFailed = HashMap[String, Int]()
- val executorToDuration = HashMap[String, Long]()
- val executorToShuffleRead = HashMap[String, Long]()
- val executorToShuffleWrite = HashMap[String, Long]()
-
- def storageStatusList = storageStatusListener.storageStatusList
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val eid = formatExecutorId(taskStart.taskInfo.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val info = taskEnd.taskInfo
- if (info != null) {
- val eid = formatExecutorId(info.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
- executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
- taskEnd.reason match {
- case e: ExceptionFailure =>
- executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
- case _ =>
- executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- }
-
- // Update shuffle read/write
- val metrics = taskEnd.taskMetrics
- if (metrics != null) {
- metrics.shuffleReadMetrics.foreach { shuffleRead =>
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
- }
- metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
- }
- }
- }
- }
-
- // This addresses executor ID inconsistencies in the local mode
- private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 73861ae..c83e196 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.xml.Node
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
- private lazy val listener = parent.listener
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+ private val listener = parent.listener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
- <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
deleted file mode 100644
index 8619a31..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.spark.scheduler.Schedulable
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-
-/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class IndexPage(parent: JobProgressUI) {
- private val basePath = parent.basePath
- private val live = parent.live
- private val sc = parent.sc
- private lazy val listener = parent.listener
- private lazy val isFairScheduler = parent.isFairScheduler
-
- private def appName = parent.appName
-
- def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
- val activeStages = listener.activeStages.values.toSeq
- val completedStages = listener.completedStages.reverse.toSeq
- val failedStages = listener.failedStages.reverse.toSeq
- val now = System.currentTimeMillis()
-
- val activeStagesTable =
- new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
- val completedStagesTable =
- new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
- val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
-
- // For now, pool information is only accessible in live UIs
- val pools = if (live) sc.getAllPools else Seq[Schedulable]()
- val poolTable = new PoolTable(pools, parent)
-
- val summary: NodeSeq =
- <div>
- <ul class="unstyled">
- {if (live) {
- // Total duration is not meaningful unless the UI is live
- <li>
- <strong>Total Duration: </strong>
- {parent.formatDuration(now - sc.startTime)}
- </li>
- }}
- <li>
- <strong>Scheduling Mode: </strong>
- {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
- </li>
- <li>
- <a href="#active"><strong>Active Stages:</strong></a>
- {activeStages.size}
- </li>
- <li>
- <a href="#completed"><strong>Completed Stages:</strong></a>
- {completedStages.size}
- </li>
- <li>
- <a href="#failed"><strong>Failed Stages:</strong></a>
- {failedStages.size}
- </li>
- </ul>
- </div>
-
- val content = summary ++
- {if (live && isFairScheduler) {
- <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
- } else {
- Seq[Node]()
- }} ++
- <h4 id="active">Active Stages ({activeStages.size})</h4> ++
- activeStagesTable.toNodeSeq ++
- <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
- completedStagesTable.toNodeSeq ++
- <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
- failedStagesTable.toNodeSeq
-
- UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5167e20..0db4afa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
- val schedulingModeName =
- environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode")
- schedulingMode = schedulingModeName match {
- case Some(name) => Some(SchedulingMode.withName(name))
- case None => None
- }
+ schedulingMode = environmentUpdate
+ .environmentDetails("Spark Properties").toMap
+ .get("spark.scheduler.mode")
+ .map(SchedulingMode.withName)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
new file mode 100644
index 0000000..34ff2ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+
+/** Page showing list of all ongoing and recently finished stages and pools */
+private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private val live = parent.live
+ private val sc = parent.sc
+ private val listener = parent.listener
+ private lazy val isFairScheduler = parent.isFairScheduler
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ listener.synchronized {
+ val activeStages = listener.activeStages.values.toSeq
+ val completedStages = listener.completedStages.reverse.toSeq
+ val failedStages = listener.failedStages.reverse.toSeq
+ val now = System.currentTimeMillis
+
+ val activeStagesTable =
+ new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
+ val completedStagesTable =
+ new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+ val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+
+ // For now, pool information is only accessible in live UIs
+ val pools = if (live) sc.getAllPools else Seq[Schedulable]()
+ val poolTable = new PoolTable(pools, parent)
+
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ {if (live) {
+ // Total duration is not meaningful unless the UI is live
+ <li>
+ <strong>Total Duration: </strong>
+ {UIUtils.formatDuration(now - sc.startTime)}
+ </li>
+ }}
+ <li>
+ <strong>Scheduling Mode: </strong>
+ {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+ </li>
+ <li>
+ <a href="#active"><strong>Active Stages:</strong></a>
+ {activeStages.size}
+ </li>
+ <li>
+ <a href="#completed"><strong>Completed Stages:</strong></a>
+ {completedStages.size}
+ </li>
+ <li>
+ <a href="#failed"><strong>Failed Stages:</strong></a>
+ {failedStages.size}
+ </li>
+ </ul>
+ </div>
+
+ val content = summary ++
+ {if (live && isFairScheduler) {
+ <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
+ } else {
+ Seq[Node]()
+ }} ++
+ <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+ activeStagesTable.toNodeSeq ++
+ <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+ completedStagesTable.toNodeSeq ++
+ <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+ failedStagesTable.toNodeSeq
+
+ UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
new file mode 100644
index 0000000..3308c8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val live = parent.live
+ val sc = parent.sc
+ val conf = if (live) sc.conf else new SparkConf
+ val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
+ val listener = new JobProgressListener(conf)
+
+ attachPage(new JobProgressPage(this))
+ attachPage(new StagePage(this))
+ attachPage(new PoolPage(this))
+ parent.registerListener(listener)
+
+ def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+
+ def handleKillRequest(request: HttpServletRequest) = {
+ if (killEnabled) {
+ val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+ val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+ if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+ sc.cancelStage(stageId)
+ }
+ // Do a quick pause here to give Spark time to kill the stage so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
deleted file mode 100644
index 30e3f35..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressUI(parent: SparkUI) {
- val basePath = parent.basePath
- val live = parent.live
- val sc = parent.sc
- val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true)
-
- lazy val listener = _listener.get
- lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
- private val indexPage = new IndexPage(this)
- private val stagePage = new StagePage(this)
- private val poolPage = new PoolPage(this)
- private var _listener: Option[JobProgressListener] = None
-
- def appName = parent.appName
-
- def start() {
- val conf = if (live) sc.conf else new SparkConf
- _listener = Some(new JobProgressListener(conf))
- }
-
- def formatDuration(ms: Long) = Utils.msDurationToString(ms)
-
- private def handleKillRequest(request: HttpServletRequest) = {
- if (killEnabled) {
- val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
- val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
- if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
- sc.cancelStage(stageId)
- }
- // Do a quick pause here to give Spark time to kill the stage so it shows up as
- // killed after the refresh. Note that this will block the serving thread so the
- // time should be limited in duration.
- Thread.sleep(100)
- }
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest),
- createServletHandler("/stages/stage",
- (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages/pool",
- (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 3638e60..fd83d37 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -22,17 +22,15 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.scheduler.{Schedulable, StageInfo}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressUI) {
+private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
+ private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
@@ -52,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressUI) {
<h4>Summary </h4> ++ poolTable.toNodeSeq ++
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
- UIUtils.headerSparkPage(
- content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
+ parent.headerTabs, parent)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index c5c8d86..f4b68f2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
private val basePath = parent.basePath
- private val poolToActiveStages = listener.poolToActiveStages
- private lazy val listener = parent.listener
+ private val listener = parent.listener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
<th>SchedulingMode</th>
</thead>
<tbody>
- {rows.map(r => makeRow(r, poolToActiveStages))}
+ {rows.map(r => makeRow(r, listener.poolToActiveStages))}
</tbody>
</table>
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index b6c3e3c..4bce472 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -22,17 +22,14 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressUI) {
+private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
- private lazy val sc = parent.sc
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
@@ -44,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
- return UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %s".format(stageId), Stages)
+ return UIUtils.headerSparkPage(content, basePath, appName,
+ "Details for Stage %s".format(stageId), parent.headerTabs, parent)
}
val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
@@ -60,7 +57,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
var activeTime = 0L
- val now = System.currentTimeMillis()
+ val now = System.currentTimeMillis
val tasksActive = listener.stageIdToTasksActive(stageId).values
tasksActive.foreach(activeTime += _.timeRunning(now))
@@ -70,7 +67,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
<ul class="unstyled">
<li>
<strong>Total task time across all tasks: </strong>
- {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+ {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
<li>
@@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val serializationQuantiles =
"Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+ get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => parent.formatDuration(ms.toLong))
+ .map(ms => UIUtils.formatDuration(ms.toLong))
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val gettingResultQuantiles = "Time spent fetching task results" +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val schedulerDelayQuantiles = "Scheduler delay" +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
def getQuantileCols(data: Seq[Double]) =
@@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
<h4>Tasks</h4> ++ taskTable
- UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+ parent.headerTabs, parent)
}
}
@@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
taskData match { case TaskUIData(info, metrics, exception) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
@@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else parent.formatDuration(ms)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
}.getOrElse("")
val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
- <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
+ <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
<td sorttable_customkey={gcTime.toString}>
- {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<td sorttable_customkey={serializationTime.toString}>
- {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index e419fae..8c5b1f5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(
- stages: Seq[StageInfo],
- parent: JobProgressUI,
- killEnabled: Boolean = false) {
+ stages: Seq[StageInfo],
+ parent: JobProgressTab,
+ killEnabled: Boolean = false) {
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
def toNodeSeq: Seq[Node] = {
@@ -89,25 +89,23 @@ private[ui] class StageTable(
{s.name}
</a>
- val description = listener.stageIdToDescription.get(s.stageId)
+ listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div> {killLink}{nameLink}</div>)
-
- return description
}
/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
- case Some(t) => WebUI.formatDate(new Date(t))
+ case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
val duration = s.submissionTime.map { t =>
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
- val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val startedTasks =
listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
deleted file mode 100644
index 16996a2..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
-
-/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
- val basePath = parent.basePath
-
- private val indexPage = new IndexPage(this)
- private val rddPage = new RDDPage(this)
- private var _listener: Option[BlockManagerListener] = None
-
- lazy val listener = _listener.get
-
- def appName = parent.appName
-
- def start() {
- _listener = Some(new BlockManagerListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/storage/rdd",
- (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
- createServletHandler("/storage",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
-}
-
-/**
- * A SparkListener that prepares information to be displayed on the BlockManagerUI
- */
-private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
- private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
-
- def storageStatusList = storageStatusListener.storageStatusList
-
- /** Filter RDD info to include only those with cached partitions */
- def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
-
- /** Update each RDD's info to reflect any updates to the RDD's storage status */
- private def updateRDDInfo() {
- val rddInfos = _rddInfoMap.values.toSeq
- val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
- updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
- }
-
- /**
- * Assumes the storage status list is fully up-to-date. This implies the corresponding
- * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
- */
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val metrics = taskEnd.taskMetrics
- if (metrics != null && metrics.updatedBlocks.isDefined) {
- updateRDDInfo()
- }
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
- val rddInfo = stageSubmitted.stageInfo.rddInfo
- _rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
- }
-
- override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
- // Remove all partitions that are no longer cached
- _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
- }
-
- override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
- updateRDDInfo()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
deleted file mode 100644
index 4f6acc3..0000000
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.Utils
-
-/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
- private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
-
- def render(request: HttpServletRequest): Seq[Node] = {
-
- val rdds = listener.rddInfoList
- val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
- UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
- }
-
- /** Header fields for the RDD table */
- private def rddHeader = Seq(
- "RDD Name",
- "Storage Level",
- "Cached Partitions",
- "Fraction Cached",
- "Size in Memory",
- "Size in Tachyon",
- "Size on Disk")
-
- /** Render an HTML row representing an RDD */
- private def rddRow(rdd: RDDInfo): Seq[Node] = {
- <tr>
- <td>
- <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
- {rdd.name}
- </a>
- </td>
- <td>{rdd.storageLevel.description}
- </td>
- <td>{rdd.numCachedPartitions}</td>
- <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
- <td>{Utils.bytesToString(rdd.memSize)}</td>
- <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
- <td>{Utils.bytesToString(rdd.diskSize)}</td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 75ee997..d07f1c9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+ return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+ parent.headerTabs, parent)
}
// Worker table
@@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
</div>
</div>;
- UIUtils.headerSparkPage(
- content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+ parent.headerTabs, parent)
}
/** Header fields for the worker table */
http://git-wip-us.apache.org/repos/asf/spark/blob/6aa08c39/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
new file mode 100644
index 0000000..b66edd9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+/** Page showing list of RDD's currently stored in the cluster */
+private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private val listener = parent.listener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val rdds = listener.rddInfoList
+ val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
+ UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
+ }
+
+ /** Header fields for the RDD table */
+ private def rddHeader = Seq(
+ "RDD Name",
+ "Storage Level",
+ "Cached Partitions",
+ "Fraction Cached",
+ "Size in Memory",
+ "Size in Tachyon",
+ "Size on Disk")
+
+ /** Render an HTML row representing an RDD */
+ private def rddRow(rdd: RDDInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"%s/storage/rdd?id=%s".format(UIUtils.prependBaseUri(basePath), rdd.id)}>
+ {rdd.name}
+ </a>
+ </td>
+ <td>{rdd.storageLevel.description}
+ </td>
+ <td>{rdd.numCachedPartitions}</td>
+ <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
+ <td>{Utils.bytesToString(rdd.memSize)}</td>
+ <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
+ <td>{Utils.bytesToString(rdd.diskSize)}</td>
+ </tr>
+ }
+}