You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/01/29 21:44:34 UTC

[spark] branch master updated: [SPARK-29543][SS][UI] Structured Streaming Web UI

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7173786  [SPARK-29543][SS][UI] Structured Streaming Web UI
7173786 is described below

commit 71737861531180bbda9aec8d241b1428fe91cab2
Author: uncleGen <hu...@gmail.com>
AuthorDate: Wed Jan 29 13:43:51 2020 -0800

    [SPARK-29543][SS][UI] Structured Streaming Web UI
    
    ### What changes were proposed in this pull request?
    
    This PR adds two pages to Web UI for Structured Streaming:
       - "/streamingquery": Streaming Query Page, providing some aggregate information for running/completed streaming queries.
      - "/streamingquery/statistics": Streaming Query Statistics Page, providing detailed information for streaming query, including `Input Rate`, `Process Rate`, `Input Rows`, `Batch Duration` and `Operation Duration`
    
    ![Screen Shot 2020-01-29 at 1 38 00 PM](https://user-images.githubusercontent.com/1000778/73399837-cd01cc80-429c-11ea-9d4b-1d200a41b8d5.png)
    ![Screen Shot 2020-01-29 at 1 39 16 PM](https://user-images.githubusercontent.com/1000778/73399838-cd01cc80-429c-11ea-8185-4e56db6866bd.png)
    
    ### Why are the changes needed?
    
    It helps users to better monitor Structured Streaming query.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    - new added and existing UTs
    - manual test
    
    Closes #26201 from uncleGen/SPARK-29543.
    
    Lead-authored-by: uncleGen <hu...@gmail.com>
    Co-authored-by: Yuanjian Li <xy...@gmail.com>
    Co-authored-by: Genmao Yu <hu...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../org/apache/spark}/ui/static/streaming-page.css |   0
 .../org/apache/spark}/ui/static/streaming-page.js  |   0
 .../spark/ui/static/structured-streaming-page.js   | 171 +++++++++++++
 .../resources/org/apache/spark/ui/static/webui.js  |   2 +
 .../scala/org/apache/spark/ui/GraphUIData.scala    | 169 +++++++++++++
 .../main/scala/org/apache/spark/ui/UIUtils.scala   |  91 +++++++
 .../scala/org/apache/spark/ui/jobs/StagePage.scala |  14 +-
 .../org/apache/spark/ui/jobs/StageTable.scala      |  14 +-
 project/MimaExcludes.scala                         |   5 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  16 ++
 .../sql/execution/streaming/ProgressReporter.scala |   5 +-
 .../sql/execution/streaming/StreamExecution.scala  |   3 +-
 .../apache/spark/sql/internal/SharedState.scala    |  19 +-
 .../sql/streaming/StreamingQueryListener.scala     |   4 +-
 .../sql/streaming/StreamingQueryManager.scala      |   6 +-
 .../org/apache/spark/sql/streaming/progress.scala  |   2 +
 .../sql/streaming/ui/StreamingQueryPage.scala      | 147 +++++++++++
 .../ui/StreamingQueryStatisticsPage.scala          | 271 +++++++++++++++++++++
 .../ui/StreamingQueryStatusListener.scala          | 122 ++++++++++
 .../spark/sql/streaming/ui/StreamingQueryTab.scala |  33 +--
 .../apache/spark/sql/streaming/ui/UIUtils.scala    |  60 +++++
 .../streaming/StreamingQueryListenerSuite.scala    |  10 +-
 .../StreamingQueryStatusAndProgressSuite.scala     |   2 +
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  14 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala | 125 ++++++++++
 .../ui/StreamingQueryStatusListenerSuite.scala     | 101 ++++++++
 .../spark/sql/streaming/ui/UIUtilsSuite.scala      |  41 ++++
 .../hive/thriftserver/ui/ThriftServerPage.scala    |  16 +-
 .../apache/spark/streaming/dstream/DStream.scala   |   4 +-
 .../spark/streaming/scheduler/JobScheduler.scala   |   4 +-
 .../spark/streaming/ui/AllBatchesTable.scala       |   2 +-
 .../org/apache/spark/streaming/ui/BatchPage.scala  |   2 +-
 .../apache/spark/streaming/ui/StreamingPage.scala  | 125 +---------
 .../apache/spark/streaming/ui/StreamingTab.scala   |   2 +-
 .../org/apache/spark/streaming/ui/UIUtils.scala    |  71 +-----
 .../apache/spark/streaming/DStreamScopeSuite.scala |   6 +-
 .../apache/spark/streaming/ui/UIUtilsSuite.scala   |  12 +-
 37 files changed, 1408 insertions(+), 283 deletions(-)

diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
diff --git a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
new file mode 100644
index 0000000..70250fd
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// pre-define some colors for legends.
+var colorPool = ["#F8C471", "#F39C12", "#B9770E", "#73C6B6", "#16A085", "#117A65", "#B2BABB", "#7F8C8D", "#616A6B"];
+
+function drawAreaStack(id, labels, values, minX, maxX, minY, maxY) {
+    d3.select(d3.select(id).node().parentNode)
+        .style("padding", "8px 0 8px 8px")
+        .style("border-right", "0px solid white");
+
+    // Setup svg using Bostock's margin convention
+    var margin = {top: 20, right: 40, bottom: 30, left: maxMarginLeftForTimeline};
+    var width = 850 - margin.left - margin.right;
+    var height = 300 - margin.top - margin.bottom;
+
+    var svg = d3.select(id)
+        .append("svg")
+        .attr("width", width + margin.left + margin.right)
+        .attr("height", height + margin.top + margin.bottom)
+        .append("g")
+        .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+    var data = values;
+
+    var parse = d3.time.format("%H:%M:%S.%L").parse;
+
+    // Transpose the data into layers
+    var dataset = d3.layout.stack()(labels.map(function(fruit) {
+        return data.map(function(d) {
+            return {_x: d.x, x: parse(d.x), y: +d[fruit]};
+        });
+    }));
+
+
+    // Set x, y and colors
+    var x = d3.scale.ordinal()
+        .domain(dataset[0].map(function(d) { return d.x; }))
+        .rangeRoundBands([10, width-10], 0.02);
+
+    var y = d3.scale.linear()
+        .domain([0, d3.max(dataset, function(d) {  return d3.max(d, function(d) { return d.y0 + d.y; });  })])
+        .range([height, 0]);
+
+    var colors = colorPool.slice(0, labels.length)
+
+    // Define and draw axes
+    var yAxis = d3.svg.axis()
+        .scale(y)
+        .orient("left")
+        .ticks(7)
+        .tickFormat( function(d) { return d } );
+
+    var xAxis = d3.svg.axis()
+        .scale(x)
+        .orient("bottom")
+        .tickFormat(d3.time.format("%H:%M:%S.%L"));
+
+    // Only show the first and last time in the graph
+    var xline = []
+    xline.push(x.domain()[0])
+    xline.push(x.domain()[x.domain().length - 1])
+    xAxis.tickValues(xline);
+
+    svg.append("g")
+        .attr("class", "y axis")
+        .call(yAxis)
+        .append("text")
+            .attr("transform", "translate(0," + unitLabelYOffset + ")")
+            .text("ms");
+
+    svg.append("g")
+        .attr("class", "x axis")
+        .attr("transform", "translate(0," + height + ")")
+        .call(xAxis);
+
+    // Create groups for each series, rects for each segment
+    var groups = svg.selectAll("g.cost")
+        .data(dataset)
+        .enter().append("g")
+        .attr("class", "cost")
+        .style("fill", function(d, i) { return colors[i]; });
+
+    var rect = groups.selectAll("rect")
+        .data(function(d) { return d; })
+        .enter()
+        .append("rect")
+        .attr("x", function(d) { return x(d.x); })
+        .attr("y", function(d) { return y(d.y0 + d.y); })
+        .attr("height", function(d) { return y(d.y0) - y(d.y0 + d.y); })
+        .attr("width", x.rangeBand())
+        .on('mouseover', function(d) {
+            var tip = '';
+            var idx = 0;
+            var _values = timeToValues[d._x]
+            _values.forEach(function (k) {
+                tip += labels[idx] + ': ' + k + '   ';
+                idx += 1;
+            });
+            tip += " at " + d._x
+            showBootstrapTooltip(d3.select(this).node(), tip);
+        })
+        .on('mouseout',  function() {
+            hideBootstrapTooltip(d3.select(this).node());
+        })
+        .on("mousemove", function(d) {
+            var xPosition = d3.mouse(this)[0] - 15;
+            var yPosition = d3.mouse(this)[1] - 25;
+            tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+            tooltip.select("text").text(d.y);
+        });
+
+
+    // Draw legend
+    var legend = svg.selectAll(".legend")
+        .data(colors)
+        .enter().append("g")
+        .attr("class", "legend")
+        .attr("transform", function(d, i) { return "translate(30," + i * 19 + ")"; });
+
+    legend.append("rect")
+        .attr("x", width - 20)
+        .attr("width", 18)
+        .attr("height", 18)
+        .style("fill", function(d, i) {return colors.slice().reverse()[i];})
+        .on('mouseover', function(d, i) {
+            var len = labels.length
+            showBootstrapTooltip(d3.select(this).node(), labels[len - 1 - i]);
+        })
+        .on('mouseout',  function() {
+            hideBootstrapTooltip(d3.select(this).node());
+        })
+        .on("mousemove", function(d) {
+            var xPosition = d3.mouse(this)[0] - 15;
+            var yPosition = d3.mouse(this)[1] - 25;
+            tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+            tooltip.select("text").text(d.y);
+        });
+
+    // Prep the tooltip bits, initial display is hidden
+    var tooltip = svg.append("g")
+        .attr("class", "tooltip")
+        .style("display", "none");
+
+    tooltip.append("rect")
+        .attr("width", 30)
+        .attr("height", 20)
+        .attr("fill", "white")
+        .style("opacity", 0.5);
+
+    tooltip.append("text")
+        .attr("x", 15)
+        .attr("dy", "1.2em")
+        .style("text-anchor", "middle")
+        .attr("font-size", "12px")
+        .attr("font-weight", "bold");
+}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index fac464e..0ba461f 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -90,6 +90,8 @@ $(function() {
   collapseTablePageLoad('collapse-aggregated-sessionstat','aggregated-sessionstat');
   collapseTablePageLoad('collapse-aggregated-sqlstat','aggregated-sqlstat');
   collapseTablePageLoad('collapse-aggregated-sqlsessionstat','aggregated-sqlsessionstat');
+  collapseTablePageLoad('collapse-aggregated-activeQueries','aggregated-activeQueries');
+  collapseTablePageLoad('collapse-aggregated-completedQueries','aggregated-completedQueries');
 });
 
 $(function() {
diff --git a/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
new file mode 100644
index 0000000..87ff677
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.{util => ju}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.{Node, Unparsed}
+
+/**
+ * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
+ *
+ * @param timelineDivId the timeline `id` used in the html `div` tag
+ * @param histogramDivId the timeline `id` used in the html `div` tag
+ * @param data the data for the graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ *                      the graph
+ */
+private[spark] class GraphUIData(
+    timelineDivId: String,
+    histogramDivId: String,
+    data: Seq[(Long, Double)],
+    minX: Long,
+    maxX: Long,
+    minY: Double,
+    maxY: Double,
+    unitY: String,
+    batchInterval: Option[Double] = None) {
+
+  private var dataJavaScriptName: String = _
+
+  def generateDataJs(jsCollector: JsCollector): Unit = {
+    val jsForData = data.map { case (x, y) =>
+      s"""{"x": $x, "y": $y}"""
+    }.mkString("[", ",", "]")
+    dataJavaScriptName = jsCollector.nextVariableName
+    jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+  }
+
+  def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
+    jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+    if (batchInterval.isDefined) {
+      jsCollector.addStatement(
+        "drawTimeline(" +
+          s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
+          s" ${batchInterval.get}" +
+          ");")
+    } else {
+      jsCollector.addStatement(
+        s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
+          s" '$unitY');")
+    }
+    <div id={timelineDivId}></div>
+  }
+
+  def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
+    val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
+    jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
+    if (batchInterval.isDefined) {
+      jsCollector.addStatement(
+        "drawHistogram(" +
+          s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+          ");")
+    } else {
+      jsCollector.addStatement(
+        s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
+    }
+    <div id={histogramDivId}></div>
+  }
+
+  def generateAreaStackHtmlWithData(
+      jsCollector: JsCollector,
+      values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = {
+    val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+    val durationDataPadding = UIUtils.durationDataPadding(values)
+    val jsForData = durationDataPadding.map { case (x, y) =>
+      val s = y.toSeq.sortBy(_._1).map(e => s""""${e._1}": "${e._2}"""").mkString(",")
+      s"""{x: "${UIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}", $s}"""
+    }.mkString("[", ",", "]")
+    val jsForLabels = operationLabels.toSeq.sorted.mkString("[\"", "\",\"", "\"]")
+
+    val (maxX, minX, maxY, minY) = if (values != null && values.length > 0) {
+      val xValues = values.map(_._1.toLong)
+      val yValues = values.map(_._2.asScala.toSeq.map(_._2.toLong).sum)
+      (xValues.max, xValues.min, yValues.max, yValues.min)
+    } else {
+      (0L, 0L, 0L, 0L)
+    }
+
+    dataJavaScriptName = jsCollector.nextVariableName
+    jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+    val labels = jsCollector.nextVariableName
+    jsCollector.addPreparedStatement(s"var $labels = $jsForLabels;")
+    jsCollector.addStatement(
+      s"drawAreaStack('#$timelineDivId', $labels, $dataJavaScriptName, $minX, $maxX, $minY, $maxY)")
+    <div id={timelineDivId}></div>
+  }
+}
+
+/**
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[spark] class JsCollector {
+
+  private var variableId = 0
+
+  /**
+   * Return the next unused JavaScript variable name
+   */
+  def nextVariableName: String = {
+    variableId += 1
+    "v" + variableId
+  }
+
+  /**
+   * JavaScript statements that will execute before `statements`
+   */
+  private val preparedStatements = ArrayBuffer[String]()
+
+  /**
+   * JavaScript statements that will execute after `preparedStatements`
+   */
+  private val statements = ArrayBuffer[String]()
+
+  def addPreparedStatement(js: String): Unit = {
+    preparedStatements += js
+  }
+
+  def addStatement(js: String): Unit = {
+    statements += js
+  }
+
+  /**
+   * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+   */
+  def toHtml: Seq[Node] = {
+    val js =
+      s"""
+         |$$(document).ready(function() {
+         |    ${preparedStatements.mkString("\n")}
+         |    ${statements.mkString("\n")}
+         |});""".stripMargin
+
+    <script>{Unparsed(js)}</script>
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 143303d..94c4521 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ui
 
+import java.{util => ju}
+import java.lang.{Long => JLong}
 import java.net.URLDecoder
 import java.nio.charset.StandardCharsets.UTF_8
 import java.text.SimpleDateFormat
@@ -24,6 +26,7 @@ import java.util.{Date, Locale, TimeZone}
 import javax.servlet.http.HttpServletRequest
 import javax.ws.rs.core.{MediaType, Response}
 
+import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 import scala.xml._
 import scala.xml.transform.{RewriteRule, RuleTransformer}
@@ -119,6 +122,59 @@ private[spark] object UIUtils extends Logging {
     }
   }
 
+  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+  private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat =
+      new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US)
+  }
+
+  private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat =
+      new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US)
+  }
+
+  /**
+   * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
+   * format `batchTime` without milliseconds.
+   *
+   * @param batchTime the batch time to be formatted
+   * @param batchInterval the batch interval
+   * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
+   *                     only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
+   * @param timezone only for test
+   */
+  def formatBatchTime(
+      batchTime: Long,
+      batchInterval: Long,
+      showYYYYMMSS: Boolean = true,
+      timezone: TimeZone = null): String = {
+    val oldTimezones =
+      (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
+    if (timezone != null) {
+      batchTimeFormat.get.setTimeZone(timezone)
+      batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
+    }
+    try {
+      val formattedBatchTime =
+        if (batchInterval < 1000) {
+          batchTimeFormatWithMilliseconds.get.format(batchTime)
+        } else {
+          // If batchInterval >= 1 second, don't show milliseconds
+          batchTimeFormat.get.format(batchTime)
+        }
+      if (showYYYYMMSS) {
+        formattedBatchTime
+      } else {
+        formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
+      }
+    } finally {
+      if (timezone != null) {
+        batchTimeFormat.get.setTimeZone(oldTimezones._1)
+        batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
+      }
+    }
+  }
+
   /** Generate a human-readable string representing a number (e.g. 100 K) */
   def formatNumber(records: Double): String = {
     val trillion = 1e12
@@ -572,4 +628,39 @@ private[spark] object UIUtils extends Logging {
   def buildErrorResponse(status: Response.Status, msg: String): Response = {
     Response.status(status).entity(msg).`type`(MediaType.TEXT_PLAIN).build()
   }
+
+  /**
+   * There may be different duration labels in each batch. So we need to
+   * mark those missing duration label as '0d' to avoid UI rending error.
+   */
+  def durationDataPadding(
+      values: Array[(Long, ju.Map[String, JLong])]): Array[(Long, Map[String, Double])] = {
+    val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+    values.map { case (xValue, yValue) =>
+      val dataPadding = operationLabels.map { opLabel =>
+        if (yValue.containsKey(opLabel)) {
+          (opLabel, yValue.get(opLabel).toDouble)
+        } else {
+          (opLabel, 0d)
+        }
+      }
+      (xValue, dataPadding.toMap)
+    }
+  }
+
+  def detailsUINode(isMultiline: Boolean, message: String): Seq[Node] = {
+    if (isMultiline) {
+      // scalastyle:off
+      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+        <div class="stacktrace-details collapsed">
+          <pre>{message}</pre>
+        </div>
+      // scalastyle:on
+    } else {
+      Seq.empty[Node]
+    }
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 4dc5349..ccaa70b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -721,19 +721,7 @@ private[ui] class TaskPagedTable(
       } else {
         error
       })
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{error}</pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
+    val details = UIUtils.detailsUINode(isMultiline, error)
     <td>{errorSummary}{details}</td>
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ac431c9..a7d38e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -309,19 +309,7 @@ private[ui] class StagePagedTable(
       } else {
         failureReason
       })
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{failureReason}</pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
+    val details = UIUtils.detailsUINode(isMultiline, failureReason)
     <td valign="middle">{failureReasonSummary}{details}</td>
   }
 
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 68e9313..65ffa22 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -489,7 +489,10 @@ object MimaExcludes {
     ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegressionModel.setPredictionCol"),
     ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setFeaturesCol"),
     ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setLabelCol"),
-    ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol")
+    ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol"),
+
+    // [SPARK-29543][SS][UI] Init structured streaming ui
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.this")
   )
 
   // Exclude rules for 2.4.x
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0e0a814..e13d65b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,6 +1150,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_UI_ENABLED =
+    buildConf("spark.sql.streaming.ui.enabled")
+      .doc("Whether to run the structured streaming UI for the Spark application.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val STREAMING_UI_INACTIVE_QUERY_RETENTION =
+    buildConf("spark.sql.streaming.ui.numInactiveQueries")
+      .doc("The number of inactive queries to retain for structured streaming ui.")
+      .intConf
+      .createWithDefault(100)
+
   val VARIABLE_SUBSTITUTE_ENABLED =
     buildConf("spark.sql.variable.substitute")
       .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
@@ -2262,6 +2274,10 @@ class SQLConf extends Serializable with Logging {
 
   def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
 
+  def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
+
+  def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
+
   def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
 
   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 71bcd53..f20291e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -148,8 +148,8 @@ trait ProgressReporter extends Logging {
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
     val executionStats = extractExecutionStats(hasNewData)
-    val processingTimeSec = Math.max(1L,
-      currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
+    val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+    val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
 
     val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
       (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
@@ -181,6 +181,7 @@ trait ProgressReporter extends Logging {
       name = name,
       timestamp = formatTimestamp(currentTriggerStartTimestamp),
       batchId = currentBatchId,
+      batchDuration = processingTimeMills,
       durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava),
       eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
       stateOperators = executionStats.stateOperators.toArray,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6dff5c6..ed908a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,7 +307,8 @@ abstract class StreamExecution(
       }
 
       // `postEvent` does not throw non fatal exception.
-      postEvent(new QueryStartedEvent(id, runId, name))
+      val submissionTime = triggerClock.getTimeMillis()
+      postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
 
       // Unblock starting thread
       startLatch.countDown()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index de3805e..fefd72d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.internal
 
 import java.net.URL
-import java.util.{Locale, UUID}
+import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 import javax.annotation.concurrent.GuardedBy
 
@@ -36,6 +36,8 @@ import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.streaming.StreamingQueryListener
+import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.Utils
 
@@ -139,6 +141,21 @@ private[sql] class SharedState(
   }
 
   /**
+   * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
+   * data to show.
+   */
+  lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
+    val sqlConf = SQLConf.get
+    if (sqlConf.isStreamingUIEnabled) {
+      val statusListener = new StreamingQueryStatusListener(sqlConf)
+      sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
+      Some(statusListener)
+    } else {
+      None
+    }
+  }
+
+  /**
    * A catalog that interacts with external systems.
    */
   lazy val externalCatalog: ExternalCatalogWithListener = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index cc81cf6..dd842cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,13 +82,15 @@ object StreamingQueryListener {
    * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
    * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
    * @param name User-specified name of the query, null if not specified.
+   * @param submissionTime The timestamp to start a query.
    * @since 2.1.0
    */
   @Evolving
   class QueryStartedEvent private[sql](
       val id: UUID,
       val runId: UUID,
-      val name: String) extends Event
+      val name: String,
+      val submissionTime: Long) extends Event
 
   /**
    * Event representing any progress updates in a query.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 810f4a1..4d0d8ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
@@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -68,6 +69,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
         logInfo(s"Registered listener ${listener.getClass.getName}")
       })
     }
+    sparkSession.sharedState.streamingQueryStatusListener.foreach { listener =>
+      addListener(listener)
+    }
   } catch {
     case e: Exception =>
       throw new SparkException("Exception when registering StreamingQueryListener", e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index a9681db..13b506b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -85,6 +85,7 @@ class StateOperatorProgress private[sql](
  *                case of retries after a failure a given batchId my be executed more than once.
  *                Similarly, when there is no data to be processed, the batchId will not be
  *                incremented.
+ * @param batchDuration The process duration of each batch.
  * @param durationMs The amount of time taken to perform various operations in milliseconds.
  * @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
  *                 {{{
@@ -105,6 +106,7 @@ class StreamingQueryProgress private[sql](
   val name: String,
   val timestamp: String,
   val batchId: Long,
+  val batchDuration: Long,
   val durationMs: ju.Map[String, JLong],
   val eventTime: ju.Map[String, String],
   val stateOperators: Array[StateOperatorProgress],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
new file mode 100644
index 0000000..650f64f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.streaming.ui.UIUtils._
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
+
+private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
+    extends WebUIPage("") with Logging {
+  val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+  df.setTimeZone(getTimeZone("UTC"))
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val content = generateStreamingQueryTable(request)
+    SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent)
+  }
+
+  def generateDataRow(request: HttpServletRequest, queryActive: Boolean)
+    (query: StreamingQueryUIData): Seq[Node] = {
+
+    def details(detail: Any): Seq[Node] = {
+      if (queryActive) {
+        return Seq.empty[Node]
+      }
+      val detailString = detail.asInstanceOf[String]
+      val isMultiline = detailString.indexOf('\n') >= 0
+      val summary = StringEscapeUtils.escapeHtml4(
+        if (isMultiline) detailString.substring(0, detailString.indexOf('\n')) else detailString
+      )
+      val details = SparkUIUtils.detailsUINode(isMultiline, detailString)
+      <td>{summary}{details}</td>
+    }
+
+    val statisticsLink = "%s/%s/statistics?id=%s"
+      .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId)
+
+    val name = UIUtils.getQueryName(query)
+    val status = UIUtils.getQueryStatus(query)
+    val duration = if (queryActive) {
+      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
+    } else {
+      withNoProgress(query, {
+        val endTimeMs = query.lastProgress.timestamp
+        SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime)
+      }, "-")
+    }
+
+    <tr>
+      <td> {name} </td>
+      <td> {status} </td>
+      <td> {query.id} </td>
+      <td> <a href={statisticsLink}> {query.runId} </a> </td>
+      <td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
+      <td> {duration} </td>
+      <td> {withNoProgress(query, {
+        (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
+          query.recentProgress.length).formatted("%.2f") }, "NaN")}
+      </td>
+      <td> {withNoProgress(query, {
+        (query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum /
+          query.recentProgress.length).formatted("%.2f") }, "NaN")}
+      </td>
+      <td> {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} </td>
+      {details(query.exception.getOrElse("-"))}
+    </tr>
+  }
+
+  private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
+    val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
+      .partition(_.isActive)
+    val activeQueryTables = if (activeQueries.nonEmpty) {
+      val headerRow = Seq(
+        "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
+        "Avg Process /sec", "Lastest Batch")
+
+      Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
+        activeQueries, true, None, Seq(null), false))
+    } else {
+      None
+    }
+
+    val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
+      val headerRow = Seq(
+        "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
+        "Avg Process /sec", "Lastest Batch", "Error")
+
+      Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
+        inactiveQueries, true, None, Seq(null), false))
+    } else {
+      None
+    }
+
+    // scalastyle:off
+    val content =
+      <span id="completed" class="collapse-aggregated-activeQueries collapse-table"
+            onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
+        <h5 id="activequeries">
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>Active Streaming Queries ({activeQueries.length})</a>
+        </h5>
+      </span> ++
+      <div>
+        <ul class="aggregated-activeQueries collapsible-table">
+          {activeQueryTables.getOrElse(Seq.empty[Node])}
+        </ul>
+      </div> ++
+      <span id="completed" class="collapse-aggregated-completedQueries collapse-table"
+            onClick="collapseTable('collapse-aggregated-completedQueries','aggregated-completedQueries')">
+        <h5 id="completedqueries">
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>Completed Streaming Queries ({inactiveQueries.length})</a>
+        </h5>
+      </span> ++
+      <div>
+        <ul class="aggregated-completedQueries collapsible-table">
+          {inactiveQueryTables.getOrElse(Seq.empty[Node])}
+        </ul>
+      </div>
+    // scalastyle:on
+
+    content
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
new file mode 100644
index 0000000..56672ce
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.text.SimpleDateFormat
+import java.util.UUID
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Unparsed}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.streaming.ui.UIUtils._
+import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
+
+private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
+  extends WebUIPage("statistics") with Logging {
+  val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+  df.setTimeZone(getTimeZone("UTC"))
+
+  def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
+    // scalastyle:off
+    <script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
+        <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/>
+      <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script>
+      <script src={SparkUIUtils.prependBaseUri(request, "/static/structured-streaming-page.js")}></script>
+    // scalastyle:on
+  }
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val parameterId = request.getParameter("id")
+    require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+    val query = parent.statusListener.allQueryStatus.find { case q =>
+      q.runId.equals(UUID.fromString(parameterId))
+    }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
+
+    val resources = generateLoadResources(request)
+    val basicInfo = generateBasicInfo(query)
+    val content =
+      resources ++
+        basicInfo ++
+        generateStatTable(query)
+    SparkUIUtils.headerSparkPage(request, "Streaming Query Statistics", content, parent)
+  }
+
+  def generateTimeMap(times: Seq[Long]): Seq[Node] = {
+    val js = "var timeFormat = {};\n" + times.map { time =>
+      val formattedTime = SparkUIUtils.formatBatchTime(time, 1, showYYYYMMSS = false)
+      s"timeFormat[$time] = '$formattedTime';"
+    }.mkString("\n")
+
+    <script>{Unparsed(js)}</script>
+  }
+
+  def generateVar(values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = {
+    val durationDataPadding = SparkUIUtils.durationDataPadding(values)
+    val js = "var timeToValues = {};\n" + durationDataPadding.map { case (x, y) =>
+      val s = y.toSeq.sortBy(_._1).map(e => s""""${e._2}"""").mkString("[", ",", "]")
+      s"""timeToValues["${SparkUIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}"] = $s;"""
+    }.mkString("\n")
+
+    <script>{Unparsed(js)}</script>
+  }
+
+  def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
+    val duration = if (query.isActive) {
+      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
+    } else {
+      withNoProgress(query, {
+        val end = query.lastProgress.timestamp
+        val start = query.recentProgress.head.timestamp
+        SparkUIUtils.formatDurationVerbose(
+          df.parse(end).getTime - df.parse(start).getTime)
+      }, "-")
+    }
+
+    val name = UIUtils.getQueryName(query)
+    val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
+    <div>Running batches for
+      <strong>
+        {duration}
+      </strong>
+      since
+      <strong>
+        {SparkUIUtils.formatDate(query.submissionTime)}
+      </strong>
+      (<strong>{numBatches}</strong> completed batches)
+    </div>
+    <br />
+    <div><strong>Name: </strong>{name}</div>
+    <div><strong>Id: </strong>{query.id}</div>
+    <div><strong>RunId: </strong>{query.runId}</div>
+    <br />
+  }
+
+  def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
+    val batchTimes = withNoProgress(query,
+      query.recentProgress.map(p => df.parse(p.timestamp).getTime), Array.empty[Long])
+    val minBatchTime =
+      withNoProgress(query, df.parse(query.recentProgress.head.timestamp).getTime, 0L)
+    val maxBatchTime =
+      withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L)
+    val maxRecordRate =
+      withNoProgress(query, query.recentProgress.map(_.inputRowsPerSecond).max, 0L)
+    val minRecordRate = 0L
+    val maxProcessRate =
+      withNoProgress(query, query.recentProgress.map(_.processedRowsPerSecond).max, 0L)
+
+    val minProcessRate = 0L
+    val maxRows = withNoProgress(query, query.recentProgress.map(_.numInputRows).max, 0L)
+    val minRows = 0L
+    val maxBatchDuration = withNoProgress(query, query.recentProgress.map(_.batchDuration).max, 0L)
+    val minBatchDuration = 0L
+
+    val inputRateData = withNoProgress(query,
+      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+        withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long, Double)])
+    val processRateData = withNoProgress(query,
+      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+        withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, Double)])
+    val inputRowsData = withNoProgress(query,
+      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+        withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
+    val batchDurations = withNoProgress(query,
+      query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+        withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
+    val operationDurationData = withNoProgress(query, query.recentProgress.map { p =>
+      val durationMs = p.durationMs
+      // remove "triggerExecution" as it count the other operation duration.
+      durationMs.remove("triggerExecution")
+      (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long, ju.Map[String, JLong])])
+
+    val jsCollector = new JsCollector
+    val graphUIDataForInputRate =
+      new GraphUIData(
+        "input-rate-timeline",
+        "input-rate-histogram",
+        inputRateData,
+        minBatchTime,
+        maxBatchTime,
+        minRecordRate,
+        maxRecordRate,
+        "records/sec")
+    graphUIDataForInputRate.generateDataJs(jsCollector)
+
+    val graphUIDataForProcessRate =
+      new GraphUIData(
+        "process-rate-timeline",
+        "process-rate-histogram",
+        processRateData,
+        minBatchTime,
+        maxBatchTime,
+        minProcessRate,
+        maxProcessRate,
+        "records/sec")
+    graphUIDataForProcessRate.generateDataJs(jsCollector)
+
+    val graphUIDataForInputRows =
+      new GraphUIData(
+        "input-rows-timeline",
+        "input-rows-histogram",
+        inputRowsData,
+        minBatchTime,
+        maxBatchTime,
+        minRows,
+        maxRows,
+        "records")
+    graphUIDataForInputRows.generateDataJs(jsCollector)
+
+    val graphUIDataForBatchDuration =
+      new GraphUIData(
+        "batch-duration-timeline",
+        "batch-duration-histogram",
+        batchDurations,
+        minBatchTime,
+        maxBatchTime,
+        minBatchDuration,
+        maxBatchDuration,
+        "ms")
+    graphUIDataForBatchDuration.generateDataJs(jsCollector)
+
+    val graphUIDataForDuration =
+      new GraphUIData(
+        "duration-area-stack",
+        "",
+        Seq.empty[(Long, Double)],
+        0L,
+        0L,
+        0L,
+        0L,
+        "ms")
+
+    val table =
+    // scalastyle:off
+      <table id="stat-table" class="table table-bordered" style="width: auto">
+        <thead>
+          <tr>
+            <th style="width: 160px;"></th>
+            <th style="width: 492px;">Timelines</th>
+            <th style="width: 350px;">Histograms</th></tr>
+        </thead>
+        <tbody>
+          <tr>
+            <td style="vertical-align: middle;">
+              <div style="width: 160px;">
+                <div><strong>Input Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate of data arriving.", "right")}</strong></div>
+              </div>
+            </td>
+            <td class="timeline">{graphUIDataForInputRate.generateTimelineHtml(jsCollector)}</td>
+            <td class="histogram">{graphUIDataForInputRate.generateHistogramHtml(jsCollector)}</td>
+          </tr>
+          <tr>
+            <td style="vertical-align: middle;">
+              <div style="width: 160px;">
+                <div><strong>Process Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate at which Spark is processing data.", "right")}</strong></div>
+              </div>
+            </td>
+            <td class="timeline">{graphUIDataForProcessRate.generateTimelineHtml(jsCollector)}</td>
+            <td class="histogram">{graphUIDataForProcessRate.generateHistogramHtml(jsCollector)}</td>
+          </tr>
+          <tr>
+            <td style="vertical-align: middle;">
+              <div style="width: 160px;">
+                <div><strong>Input Rows {SparkUIUtils.tooltip("The aggregate (across all sources) number of records processed in a trigger.", "right")}</strong></div>
+              </div>
+            </td>
+            <td class="timeline">{graphUIDataForInputRows.generateTimelineHtml(jsCollector)}</td>
+            <td class="histogram">{graphUIDataForInputRows.generateHistogramHtml(jsCollector)}</td>
+          </tr>
+          <tr>
+            <td style="vertical-align: middle;">
+              <div style="width: 160px;">
+                <div><strong>Batch Duration {SparkUIUtils.tooltip("The process duration of each batch.", "right")}</strong></div>
+              </div>
+            </td>
+            <td class="timeline">{graphUIDataForBatchDuration.generateTimelineHtml(jsCollector)}</td>
+            <td class="histogram">{graphUIDataForBatchDuration.generateHistogramHtml(jsCollector)}</td>
+          </tr>
+          <tr>
+            <td style="vertical-align: middle;">
+              <div style="width: auto;">
+                <div><strong>Operation Duration {SparkUIUtils.tooltip("The amount of time taken to perform various operations in milliseconds.", "right")}</strong></div>
+              </div>
+            </td>
+            <td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
+          </tr>
+        </tbody>
+      </table>
+    // scalastyle:on
+
+    generateVar(operationDurationData) ++ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
new file mode 100644
index 0000000..db085db
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
+
+/**
+ * A customized StreamingQueryListener used in structured streaming UI, which contains all
+ * UI data for both active and inactive query.
+ * TODO: Add support for history server.
+ */
+private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
+
+  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  /**
+   * We use runId as the key here instead of id in active query status map,
+   * because the runId is unique for every started query, even it its a restart.
+   */
+  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
+  private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
+
+  private val streamingProgressRetention = sqlConf.streamingProgressRetention
+  private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
+
+  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+    activeQueryStatus.putIfAbsent(event.runId,
+      new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime))
+  }
+
+  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+    val batchTimestamp = timestampFormat.parse(event.progress.timestamp).getTime
+    val queryStatus = activeQueryStatus.getOrDefault(
+      event.progress.runId,
+      new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
+        batchTimestamp))
+    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+  }
+
+  override def onQueryTerminated(
+      event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+    val queryStatus = activeQueryStatus.remove(event.runId)
+    if (queryStatus != null) {
+      queryStatus.queryTerminated(event)
+      inactiveQueryStatus += queryStatus
+      while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
+        inactiveQueryStatus.dequeue()
+      }
+    }
+  }
+
+  def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
+    activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+  }
+}
+
+/**
+ * This class contains all message related to UI display, each instance corresponds to a single
+ * [[org.apache.spark.sql.streaming.StreamingQuery]].
+ */
+private[ui] class StreamingQueryUIData(
+    val name: String,
+    val id: UUID,
+    val runId: UUID,
+    val submissionTime: Long) {
+
+  /** Holds the most recent query progress updates. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  private var _isActive = true
+  private var _exception: Option[String] = None
+
+  def isActive: Boolean = synchronized { _isActive }
+
+  def exception: Option[String] = synchronized { _exception }
+
+  def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+    _isActive = false
+    _exception = event.exception
+  }
+
+  def updateProcess(
+      newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
+    progressBuffer += newProgress
+    while (progressBuffer.length >= retentionNum) {
+      progressBuffer.dequeue()
+    }
+  }
+
+  def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.lastOption.orNull
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
similarity index 54%
copy from streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
copy to sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index 3ecc448..f909cfd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -14,35 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.spark.streaming.ui
+package org.apache.spark.sql.streaming.ui
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
-/**
- * Spark Web UI tab that shows statistics of a streaming job.
- * This assumes the given SparkContext has enabled its SparkUI.
- */
-private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI)
-  extends SparkUITab(sparkUI, "streaming") with Logging {
+private[sql] class StreamingQueryTab(
+    val statusListener: StreamingQueryStatusListener,
+    sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
 
-  private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
+  override val name = "Structured Streaming"
 
   val parent = sparkUI
-  val listener = ssc.progressListener
 
-  attachPage(new StreamingPage(this))
-  attachPage(new BatchPage(this))
+  attachPage(new StreamingQueryPage(this))
+  attachPage(new StreamingQueryStatisticsPage(this))
+  parent.attachTab(this)
 
-  def attach(): Unit = {
-    parent.attachTab(this)
-    parent.addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming")
-  }
+  parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
+}
 
-  def detach(): Unit = {
-    parent.detachTab(this)
-    parent.detachHandler("/static/streaming")
-  }
+object StreamingQueryTab {
+  private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000..57b9dec
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+private[ui] object UIUtils {
+
+  /**
+   * Check whether `number` is valid, if not return 0.0d
+   */
+  def withNumberInvalid(number: => Double): Double = {
+    if (number.isNaN || number.isInfinite) {
+      0.0d
+    } else {
+      number
+    }
+  }
+
+  /**
+   * Execute a block of code when there is already one completed batch in streaming query,
+   * otherwise return `default` value.
+   */
+  def withNoProgress[T](query: StreamingQueryUIData, body: => T, default: T): T = {
+    if (query.lastProgress != null) {
+      body
+    } else {
+      default
+    }
+  }
+
+  def getQueryName(query: StreamingQueryUIData): String = {
+    if (query.name == null || query.name.isEmpty) {
+      "<no name>"
+    } else {
+      query.name
+    }
+  }
+
+  def getQueryStatus(query: StreamingQueryUIData): String = {
+    if (query.isActive) {
+      "RUNNING"
+    } else {
+      query.exception.map(_ => "FAILED").getOrElse("FINISHED")
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 2f66dd32..9d0f829 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable
 
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.concurrent.Waiters.Waiter
 
@@ -34,6 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.util.JsonProtocol
 
@@ -47,7 +47,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   after {
     spark.streams.active.foreach(_.stop())
     assert(spark.streams.active.isEmpty)
-    assert(spark.streams.listListeners().isEmpty)
+    // Skip check default `StreamingQueryStatusListener` which is for streaming UI.
+    assert(spark.streams.listListeners()
+      .filterNot(_.isInstanceOf[StreamingQueryStatusListener]).isEmpty)
     // Make sure we don't leak any events to the next test
     spark.sparkContext.listenerBus.waitUntilEmpty()
   }
@@ -252,8 +254,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
       assert(newEvent.name === event.name)
     }
 
-    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name"))
-    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null))
+    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 1L))
+    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 1L))
   }
 
   test("QueryProgressEvent serialization") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index b6a6be2..6f00b52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -312,6 +312,7 @@ object StreamingQueryStatusAndProgressSuite {
     name = "myName",
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
+    batchDuration = 0L,
     durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
     eventTime = new java.util.HashMap(Map(
       "max" -> "2016-12-05T20:54:20.827Z",
@@ -346,6 +347,7 @@ object StreamingQueryStatusAndProgressSuite {
     name = null, // should not be present in the json
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
+    batchDuration = 0L,
     durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
     // empty maps should be handled correctly
     eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4121f49..77f5c85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -466,7 +466,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     val streamingTriggerDF = spark.createDataset(1 to 10).toDF
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
 
-    val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value"))
+    val progress = getStreamingQuery(streamingInputDF.join(streamingInputDF, "value"))
+      .recentProgress.head
     assert(progress.numInputRows === 20) // data is read multiple times in self-joins
     assert(progress.sources.size === 1)
     assert(progress.sources(0).numInputRows === 20)
@@ -479,7 +480,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
     // Trigger input has 10 rows, static input has 2 rows,
     // therefore after the first trigger, the calculated input rows should be 10
-    val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+    val progress = getStreamingQuery(streamingInputDF.join(staticInputDF, "value"))
+      .recentProgress.head
     assert(progress.numInputRows === 10)
     assert(progress.sources.size === 1)
     assert(progress.sources(0).numInputRows === 10)
@@ -492,7 +494,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
 
     // After the first trigger, the calculated input rows should be 10
-    val progress = getFirstProgress(streamingInputDF)
+    val progress = getStreamingQuery(streamingInputDF).recentProgress.head
     assert(progress.numInputRows === 10)
     assert(progress.sources.size === 1)
     assert(progress.sources(0).numInputRows === 10)
@@ -1120,12 +1122,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     StreamingExecutionRelation(source, spark)
   }
 
-  /** Returns the query progress at the end of the first trigger of streaming DF */
-  private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
+  /** Returns the query at the end of the first trigger of streaming DF */
+  private def getStreamingQuery(streamingDF: DataFrame): StreamingQuery = {
     try {
       val q = streamingDF.writeStream.format("memory").queryName("test").start()
       q.processAllAvailable()
-      q.recentProgress.head
+      q
     } finally {
       spark.streams.active.map(_.stop())
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
new file mode 100644
index 0000000..de43e47
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.{Locale, UUID}
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.BeforeAndAfter
+import scala.xml.Node
+
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+import org.apache.spark.sql.test.SharedSparkSession
+
+class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
+
+  test("correctly display streaming query page") {
+    val id = UUID.randomUUID()
+    val request = mock(classOf[HttpServletRequest])
+    val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+    val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+    when(tab.statusListener).thenReturn(statusListener)
+
+    val streamQuery = createStreamQueryUIData(id)
+    when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+    var html = renderStreamingQueryPage(request, tab)
+      .toString().toLowerCase(Locale.ROOT)
+    assert(html.contains("active streaming queries (1)"))
+    assert(html.contains("completed streaming queries (0)"))
+
+    when(streamQuery.isActive).thenReturn(false)
+    when(streamQuery.exception).thenReturn(None)
+    html = renderStreamingQueryPage(request, tab)
+      .toString().toLowerCase(Locale.ROOT)
+    assert(html.contains("active streaming queries (0)"))
+    assert(html.contains("completed streaming queries (1)"))
+    assert(html.contains("finished"))
+
+    when(streamQuery.isActive).thenReturn(false)
+    when(streamQuery.exception).thenReturn(Option("exception in query"))
+    html = renderStreamingQueryPage(request, tab)
+      .toString().toLowerCase(Locale.ROOT)
+    assert(html.contains("active streaming queries (0)"))
+    assert(html.contains("completed streaming queries (1)"))
+    assert(html.contains("failed"))
+    assert(html.contains("exception in query"))
+  }
+
+  test("correctly display streaming query statistics page") {
+    val id = UUID.randomUUID()
+    val request = mock(classOf[HttpServletRequest])
+    val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+    val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+    when(request.getParameter("id")).thenReturn(id.toString)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+    when(tab.statusListener).thenReturn(statusListener)
+
+    val streamQuery = createStreamQueryUIData(id)
+    when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+    val html = renderStreamingQueryStatisticsPage(request, tab)
+      .toString().toLowerCase(Locale.ROOT)
+
+    assert(html.contains("<strong>name: </strong>query<"))
+    assert(html.contains("""{"x": 1001898000100, "y": 10.0}"""))
+    assert(html.contains("""{"x": 1001898000100, "y": 12.0}"""))
+    assert(html.contains("(<strong>3</strong> completed batches)"))
+  }
+
+  private def createStreamQueryUIData(id: UUID): StreamingQueryUIData = {
+    val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+    when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+    when(progress.inputRowsPerSecond).thenReturn(10.0)
+    when(progress.processedRowsPerSecond).thenReturn(12.0)
+    when(progress.batchId).thenReturn(2)
+    when(progress.prettyJson).thenReturn("""{"a":1}""")
+
+    val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+    when(streamQuery.isActive).thenReturn(true)
+    when(streamQuery.name).thenReturn("query")
+    when(streamQuery.id).thenReturn(id)
+    when(streamQuery.runId).thenReturn(id)
+    when(streamQuery.submissionTime).thenReturn(1L)
+    when(streamQuery.lastProgress).thenReturn(progress)
+    when(streamQuery.recentProgress).thenReturn(Array(progress))
+    when(streamQuery.exception).thenReturn(None)
+
+    streamQuery
+  }
+
+  /**
+   * Render a stage page started with the given conf and return the HTML.
+   * This also runs a dummy execution page to populate the page with useful content.
+   */
+  private def renderStreamingQueryPage(
+      request: HttpServletRequest,
+      tab: StreamingQueryTab): Seq[Node] = {
+    val page = new StreamingQueryPage(tab)
+    page.render(request)
+  }
+
+  private def renderStreamingQueryStatisticsPage(
+      request: HttpServletRequest,
+      tab: StreamingQueryTab): Seq[Node] = {
+    val page = new StreamingQueryStatisticsPage(tab)
+    page.render(request)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
new file mode 100644
index 0000000..bd74ed3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.UUID
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
+import org.apache.spark.sql.streaming
+
+class StreamingQueryStatusListenerSuite extends StreamTest {
+
+  test("onQueryStarted, onQueryProgress, onQueryTerminated") {
+    val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+    // hanlde query started event
+    val id = UUID.randomUUID()
+    val runId = UUID.randomUUID()
+    val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, "test", 1L)
+    listener.onQueryStarted(startEvent)
+
+    // result checking
+    assert(listener.activeQueryStatus.size() == 1)
+    assert(listener.activeQueryStatus.get(runId).name == "test")
+
+    // handle query progress event
+    val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+    when(progress.id).thenReturn(id)
+    when(progress.runId).thenReturn(runId)
+    when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+    when(progress.inputRowsPerSecond).thenReturn(10.0)
+    when(progress.processedRowsPerSecond).thenReturn(12.0)
+    when(progress.batchId).thenReturn(2)
+    when(progress.prettyJson).thenReturn("""{"a":1}""")
+    val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
+    listener.onQueryProgress(processEvent)
+
+    // result checking
+    val activeQuery = listener.activeQueryStatus.get(runId)
+    assert(activeQuery.isActive)
+    assert(activeQuery.recentProgress.length == 1)
+    assert(activeQuery.lastProgress.id == id)
+    assert(activeQuery.lastProgress.runId == runId)
+    assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
+    assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
+    assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
+    assert(activeQuery.lastProgress.batchId == 2)
+    assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
+
+    // handle terminate event
+    val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
+    listener.onQueryTerminated(terminateEvent)
+
+    assert(!listener.inactiveQueryStatus.head.isActive)
+    assert(listener.inactiveQueryStatus.head.runId == runId)
+    assert(listener.inactiveQueryStatus.head.id == id)
+  }
+
+  test("same query start multiple times") {
+    val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+    // handle first time start
+    val id = UUID.randomUUID()
+    val runId0 = UUID.randomUUID()
+    val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, "test", 1L)
+    listener.onQueryStarted(startEvent0)
+
+    // handle terminate event
+    val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None)
+    listener.onQueryTerminated(terminateEvent0)
+
+    // handle second time start
+    val runId1 = UUID.randomUUID()
+    val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, "test", 1L)
+    listener.onQueryStarted(startEvent1)
+
+    // result checking
+    assert(listener.activeQueryStatus.size() == 1)
+    assert(listener.inactiveQueryStatus.length == 1)
+    assert(listener.activeQueryStatus.containsKey(runId1))
+    assert(listener.activeQueryStatus.get(runId1).id == id)
+    assert(listener.inactiveQueryStatus.head.runId == runId0)
+    assert(listener.inactiveQueryStatus.head.id == id)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000..46f2ead
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+
+class UIUtilsSuite extends SparkFunSuite with Matchers {
+  test("streaming query started with no batch completed") {
+    val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+    when(query.lastProgress).thenReturn(null)
+
+    assert(0 == UIUtils.withNoProgress(query, 1, 0))
+  }
+
+  test("streaming query started with at least one batch completed") {
+    val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+    val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+    when(query.lastProgress).thenReturn(progress)
+
+    assert(1 == UIUtils.withNoProgress(query, 1, 0))
+  }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index adfda0c5..890a668 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -375,21 +375,7 @@ private[ui] class SqlStatsPagedTable(
       } else {
         errorMessage
       })
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        + details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>
-            {errorMessage}
-          </pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
+    val details = detailsUINode(isMultiline, errorMessage)
     <td>
       {errorSummary}{details}
     </td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index e360b4a..6c981b2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -33,7 +33,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext.rddToFileName
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 import org.apache.spark.util.{CallSite, Utils}
 
 /**
@@ -138,7 +138,7 @@ abstract class DStream[T: ClassTag] (
    */
   private def makeScope(time: Time): Option[RDDOperationScope] = {
     baseScope.map { bsJson =>
-      val formattedBatchTime = UIUtils.formatBatchTime(
+      val formattedBatchTime = SparkUIUtils.formatBatchTime(
         time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
       val bs = RDDOperationScope.fromJson(bsJson)
       val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 5d543c5..7eea57c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.api.python.PythonDStream
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
 
 
@@ -230,7 +230,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       val oldProps = ssc.sparkContext.getLocalProperties
       try {
         ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get()))
-        val formattedTime = UIUtils.formatBatchTime(
+        val formattedTime = SparkUIUtils.formatBatchTime(
           job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
         val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
         val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f1070e9..b5a0e92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -51,7 +51,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
 
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
-    val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
+    val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
     val numRecords = batch.numRecords
     val schedulingDelay = batch.schedulingDelay
     val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2c85d26..04cd063 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -325,7 +325,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         throw new IllegalArgumentException(s"Missing id parameter")
       }
     val formattedBatchTime =
-      UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
+      SparkUIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
 
     val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
       throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 31ebb4c..d47287b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,78 +20,10 @@ package org.apache.spark.streaming.ui
 import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.mutable.ArrayBuffer
 import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
-
-/**
- * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
- *
- * @param timelineDivId the timeline `id` used in the html `div` tag
- * @param histogramDivId the timeline `id` used in the html `div` tag
- * @param data the data for the graph
- * @param minX the min value of X axis
- * @param maxX the max value of X axis
- * @param minY the min value of Y axis
- * @param maxY the max value of Y axis
- * @param unitY the unit of Y axis
- * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
- *                      the graph
- */
-private[ui] class GraphUIData(
-    timelineDivId: String,
-    histogramDivId: String,
-    data: Seq[(Long, Double)],
-    minX: Long,
-    maxX: Long,
-    minY: Double,
-    maxY: Double,
-    unitY: String,
-    batchInterval: Option[Double] = None) {
-
-  private var dataJavaScriptName: String = _
-
-  def generateDataJs(jsCollector: JsCollector): Unit = {
-    val jsForData = data.map { case (x, y) =>
-      s"""{"x": $x, "y": $y}"""
-    }.mkString("[", ",", "]")
-    dataJavaScriptName = jsCollector.nextVariableName
-    jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
-  }
-
-  def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
-    jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
-    if (batchInterval.isDefined) {
-      jsCollector.addStatement(
-        "drawTimeline(" +
-          s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
-          s" ${batchInterval.get}" +
-          ");")
-    } else {
-      jsCollector.addStatement(
-        s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
-          s" '$unitY');")
-    }
-    <div id={timelineDivId}></div>
-  }
-
-  def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
-    val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
-    jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
-    if (batchInterval.isDefined) {
-      jsCollector.addStatement(
-        "drawHistogram(" +
-          s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
-          ");")
-    } else {
-      jsCollector.addStatement(
-        s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
-    }
-    <div id={histogramDivId}></div>
-  }
-}
+import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
 
 /**
  * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -164,8 +96,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
   private def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
     // scalastyle:off
     <script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
-      <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.css")} type="text/css"/>
-      <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.js")}></script>
+      <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/>
+      <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script>
     // scalastyle:on
   }
 
@@ -201,7 +133,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
   private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
     val js = "var timeFormat = {};\n" + times.map { time =>
       val formattedTime =
-        UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
+        SparkUIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
       s"timeFormat[$time] = '$formattedTime';"
     }.mkString("\n")
 
@@ -544,52 +476,3 @@ private[ui] object StreamingPage {
 
 }
 
-/**
- * A helper class that allows the user to add JavaScript statements which will be executed when the
- * DOM has finished loading.
- */
-private[ui] class JsCollector {
-
-  private var variableId = 0
-
-  /**
-   * Return the next unused JavaScript variable name
-   */
-  def nextVariableName: String = {
-    variableId += 1
-    "v" + variableId
-  }
-
-  /**
-   * JavaScript statements that will execute before `statements`
-   */
-  private val preparedStatements = ArrayBuffer[String]()
-
-  /**
-   * JavaScript statements that will execute after `preparedStatements`
-   */
-  private val statements = ArrayBuffer[String]()
-
-  def addPreparedStatement(js: String): Unit = {
-    preparedStatements += js
-  }
-
-  def addStatement(js: String): Unit = {
-    statements += js
-  }
-
-  /**
-   * Generate a html snippet that will execute all scripts when the DOM has finished loading.
-   */
-  def toHtml: Seq[Node] = {
-    val js =
-      s"""
-         |$$(document).ready(function() {
-         |    ${preparedStatements.mkString("\n")}
-         |    ${statements.mkString("\n")}
-         |});""".stripMargin
-
-   <script>{Unparsed(js)}</script>
-  }
-}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 3ecc448..d616b47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -28,7 +28,7 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
 private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI)
   extends SparkUITab(sparkUI, "streaming") with Logging {
 
-  private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
+  private val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
 
   val parent = sparkUI
   val listener = ssc.progressListener
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index c21912a..dc1af0a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.streaming.ui
 
-import java.text.SimpleDateFormat
-import java.util.{Locale, TimeZone}
 import java.util.concurrent.TimeUnit
 
 import scala.xml.Node
 
 import org.apache.commons.text.StringEscapeUtils
 
+import org.apache.spark.ui.{ UIUtils => SparkUIUtils }
+
 private[streaming] object UIUtils {
 
   /**
@@ -78,59 +78,6 @@ private[streaming] object UIUtils {
     case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
   }
 
-  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
-  private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat =
-      new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US)
-  }
-
-  private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat =
-      new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US)
-  }
-
-  /**
-   * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
-   * format `batchTime` without milliseconds.
-   *
-   * @param batchTime the batch time to be formatted
-   * @param batchInterval the batch interval
-   * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
-   *                     only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
-   * @param timezone only for test
-   */
-  def formatBatchTime(
-      batchTime: Long,
-      batchInterval: Long,
-      showYYYYMMSS: Boolean = true,
-      timezone: TimeZone = null): String = {
-    val oldTimezones =
-      (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
-    if (timezone != null) {
-      batchTimeFormat.get.setTimeZone(timezone)
-      batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
-    }
-    try {
-      val formattedBatchTime =
-        if (batchInterval < 1000) {
-          batchTimeFormatWithMilliseconds.get.format(batchTime)
-        } else {
-          // If batchInterval >= 1 second, don't show milliseconds
-          batchTimeFormat.get.format(batchTime)
-        }
-      if (showYYYYMMSS) {
-        formattedBatchTime
-      } else {
-        formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
-      }
-    } finally {
-      if (timezone != null) {
-        batchTimeFormat.get.setTimeZone(oldTimezones._1)
-        batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
-      }
-    }
-  }
-
   def createOutputOperationFailureForUI(failure: String): String = {
     if (failure.startsWith("org.apache.spark.Spark")) {
       // SparkException or SparkDriverExecutionException
@@ -164,19 +111,7 @@ private[streaming] object UIUtils {
       } else {
         failureReason
       }
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{failureDetails}</pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
+    val details = SparkUIUtils.detailsUINode(isMultiline, failureDetails)
 
     if (rowspan == 1) {
       <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 1bb4116..36036fc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -19,12 +19,10 @@ package org.apache.spark.streaming
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 import org.apache.spark.util.ManualClock
 
 /**
@@ -214,7 +212,7 @@ class DStreamScopeSuite
       rddScope: RDDOperationScope,
       batchTime: Long): Unit = {
     val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
-    val formattedBatchTime = UIUtils.formatBatchTime(
+    val formattedBatchTime = SparkUIUtils.formatBatchTime(
       batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
     assert(rddScope.id === s"${baseScopeId}_$batchTime")
     assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
index d3ca2b5..5760837 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import org.scalatest.Matchers
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 
 class UIUtilsSuite extends SparkFunSuite with Matchers{
 
@@ -70,10 +71,13 @@ class UIUtilsSuite extends SparkFunSuite with Matchers{
   test("formatBatchTime") {
     val tzForTest = TimeZone.getTimeZone("America/Los_Angeles")
     val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015
-    assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
+    assert("2015/05/14 14:04:40" ===
+      SparkUIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
     assert("2015/05/14 14:04:40.452" ===
-      UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
-    assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
-    assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
+      SparkUIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
+    assert("14:04:40" ===
+      SparkUIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
+    assert("14:04:40.452" ===
+      SparkUIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org