You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/01/29 21:44:34 UTC
[spark] branch master updated: [SPARK-29543][SS][UI] Structured
Streaming Web UI
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7173786 [SPARK-29543][SS][UI] Structured Streaming Web UI
7173786 is described below
commit 71737861531180bbda9aec8d241b1428fe91cab2
Author: uncleGen <hu...@gmail.com>
AuthorDate: Wed Jan 29 13:43:51 2020 -0800
[SPARK-29543][SS][UI] Structured Streaming Web UI
### What changes were proposed in this pull request?
This PR adds two pages to Web UI for Structured Streaming:
- "/streamingquery": Streaming Query Page, providing some aggregate information for running/completed streaming queries.
- "/streamingquery/statistics": Streaming Query Statistics Page, providing detailed information for streaming query, including `Input Rate`, `Process Rate`, `Input Rows`, `Batch Duration` and `Operation Duration`
![Screen Shot 2020-01-29 at 1 38 00 PM](https://user-images.githubusercontent.com/1000778/73399837-cd01cc80-429c-11ea-9d4b-1d200a41b8d5.png)
![Screen Shot 2020-01-29 at 1 39 16 PM](https://user-images.githubusercontent.com/1000778/73399838-cd01cc80-429c-11ea-8185-4e56db6866bd.png)
### Why are the changes needed?
It helps users to better monitor Structured Streaming query.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
- new added and existing UTs
- manual test
Closes #26201 from uncleGen/SPARK-29543.
Lead-authored-by: uncleGen <hu...@gmail.com>
Co-authored-by: Yuanjian Li <xy...@gmail.com>
Co-authored-by: Genmao Yu <hu...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../org/apache/spark}/ui/static/streaming-page.css | 0
.../org/apache/spark}/ui/static/streaming-page.js | 0
.../spark/ui/static/structured-streaming-page.js | 171 +++++++++++++
.../resources/org/apache/spark/ui/static/webui.js | 2 +
.../scala/org/apache/spark/ui/GraphUIData.scala | 169 +++++++++++++
.../main/scala/org/apache/spark/ui/UIUtils.scala | 91 +++++++
.../scala/org/apache/spark/ui/jobs/StagePage.scala | 14 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 14 +-
project/MimaExcludes.scala | 5 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ++
.../sql/execution/streaming/ProgressReporter.scala | 5 +-
.../sql/execution/streaming/StreamExecution.scala | 3 +-
.../apache/spark/sql/internal/SharedState.scala | 19 +-
.../sql/streaming/StreamingQueryListener.scala | 4 +-
.../sql/streaming/StreamingQueryManager.scala | 6 +-
.../org/apache/spark/sql/streaming/progress.scala | 2 +
.../sql/streaming/ui/StreamingQueryPage.scala | 147 +++++++++++
.../ui/StreamingQueryStatisticsPage.scala | 271 +++++++++++++++++++++
.../ui/StreamingQueryStatusListener.scala | 122 ++++++++++
.../spark/sql/streaming/ui/StreamingQueryTab.scala | 33 +--
.../apache/spark/sql/streaming/ui/UIUtils.scala | 60 +++++
.../streaming/StreamingQueryListenerSuite.scala | 10 +-
.../StreamingQueryStatusAndProgressSuite.scala | 2 +
.../spark/sql/streaming/StreamingQuerySuite.scala | 14 +-
.../sql/streaming/ui/StreamingQueryPageSuite.scala | 125 ++++++++++
.../ui/StreamingQueryStatusListenerSuite.scala | 101 ++++++++
.../spark/sql/streaming/ui/UIUtilsSuite.scala | 41 ++++
.../hive/thriftserver/ui/ThriftServerPage.scala | 16 +-
.../apache/spark/streaming/dstream/DStream.scala | 4 +-
.../spark/streaming/scheduler/JobScheduler.scala | 4 +-
.../spark/streaming/ui/AllBatchesTable.scala | 2 +-
.../org/apache/spark/streaming/ui/BatchPage.scala | 2 +-
.../apache/spark/streaming/ui/StreamingPage.scala | 125 +---------
.../apache/spark/streaming/ui/StreamingTab.scala | 2 +-
.../org/apache/spark/streaming/ui/UIUtils.scala | 71 +-----
.../apache/spark/streaming/DStreamScopeSuite.scala | 6 +-
.../apache/spark/streaming/ui/UIUtilsSuite.scala | 12 +-
37 files changed, 1408 insertions(+), 283 deletions(-)
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
diff --git a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
new file mode 100644
index 0000000..70250fd
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// pre-define some colors for legends.
+var colorPool = ["#F8C471", "#F39C12", "#B9770E", "#73C6B6", "#16A085", "#117A65", "#B2BABB", "#7F8C8D", "#616A6B"];
+
+function drawAreaStack(id, labels, values, minX, maxX, minY, maxY) {
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 0 8px 8px")
+ .style("border-right", "0px solid white");
+
+ // Setup svg using Bostock's margin convention
+ var margin = {top: 20, right: 40, bottom: 30, left: maxMarginLeftForTimeline};
+ var width = 850 - margin.left - margin.right;
+ var height = 300 - margin.top - margin.bottom;
+
+ var svg = d3.select(id)
+ .append("svg")
+ .attr("width", width + margin.left + margin.right)
+ .attr("height", height + margin.top + margin.bottom)
+ .append("g")
+ .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+ var data = values;
+
+ var parse = d3.time.format("%H:%M:%S.%L").parse;
+
+ // Transpose the data into layers
+ var dataset = d3.layout.stack()(labels.map(function(fruit) {
+ return data.map(function(d) {
+ return {_x: d.x, x: parse(d.x), y: +d[fruit]};
+ });
+ }));
+
+
+ // Set x, y and colors
+ var x = d3.scale.ordinal()
+ .domain(dataset[0].map(function(d) { return d.x; }))
+ .rangeRoundBands([10, width-10], 0.02);
+
+ var y = d3.scale.linear()
+ .domain([0, d3.max(dataset, function(d) { return d3.max(d, function(d) { return d.y0 + d.y; }); })])
+ .range([height, 0]);
+
+ var colors = colorPool.slice(0, labels.length)
+
+ // Define and draw axes
+ var yAxis = d3.svg.axis()
+ .scale(y)
+ .orient("left")
+ .ticks(7)
+ .tickFormat( function(d) { return d } );
+
+ var xAxis = d3.svg.axis()
+ .scale(x)
+ .orient("bottom")
+ .tickFormat(d3.time.format("%H:%M:%S.%L"));
+
+ // Only show the first and last time in the graph
+ var xline = []
+ xline.push(x.domain()[0])
+ xline.push(x.domain()[x.domain().length - 1])
+ xAxis.tickValues(xline);
+
+ svg.append("g")
+ .attr("class", "y axis")
+ .call(yAxis)
+ .append("text")
+ .attr("transform", "translate(0," + unitLabelYOffset + ")")
+ .text("ms");
+
+ svg.append("g")
+ .attr("class", "x axis")
+ .attr("transform", "translate(0," + height + ")")
+ .call(xAxis);
+
+ // Create groups for each series, rects for each segment
+ var groups = svg.selectAll("g.cost")
+ .data(dataset)
+ .enter().append("g")
+ .attr("class", "cost")
+ .style("fill", function(d, i) { return colors[i]; });
+
+ var rect = groups.selectAll("rect")
+ .data(function(d) { return d; })
+ .enter()
+ .append("rect")
+ .attr("x", function(d) { return x(d.x); })
+ .attr("y", function(d) { return y(d.y0 + d.y); })
+ .attr("height", function(d) { return y(d.y0) - y(d.y0 + d.y); })
+ .attr("width", x.rangeBand())
+ .on('mouseover', function(d) {
+ var tip = '';
+ var idx = 0;
+ var _values = timeToValues[d._x]
+ _values.forEach(function (k) {
+ tip += labels[idx] + ': ' + k + ' ';
+ idx += 1;
+ });
+ tip += " at " + d._x
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ })
+ .on("mousemove", function(d) {
+ var xPosition = d3.mouse(this)[0] - 15;
+ var yPosition = d3.mouse(this)[1] - 25;
+ tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+ tooltip.select("text").text(d.y);
+ });
+
+
+ // Draw legend
+ var legend = svg.selectAll(".legend")
+ .data(colors)
+ .enter().append("g")
+ .attr("class", "legend")
+ .attr("transform", function(d, i) { return "translate(30," + i * 19 + ")"; });
+
+ legend.append("rect")
+ .attr("x", width - 20)
+ .attr("width", 18)
+ .attr("height", 18)
+ .style("fill", function(d, i) {return colors.slice().reverse()[i];})
+ .on('mouseover', function(d, i) {
+ var len = labels.length
+ showBootstrapTooltip(d3.select(this).node(), labels[len - 1 - i]);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ })
+ .on("mousemove", function(d) {
+ var xPosition = d3.mouse(this)[0] - 15;
+ var yPosition = d3.mouse(this)[1] - 25;
+ tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+ tooltip.select("text").text(d.y);
+ });
+
+ // Prep the tooltip bits, initial display is hidden
+ var tooltip = svg.append("g")
+ .attr("class", "tooltip")
+ .style("display", "none");
+
+ tooltip.append("rect")
+ .attr("width", 30)
+ .attr("height", 20)
+ .attr("fill", "white")
+ .style("opacity", 0.5);
+
+ tooltip.append("text")
+ .attr("x", 15)
+ .attr("dy", "1.2em")
+ .style("text-anchor", "middle")
+ .attr("font-size", "12px")
+ .attr("font-weight", "bold");
+}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index fac464e..0ba461f 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -90,6 +90,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-sessionstat','aggregated-sessionstat');
collapseTablePageLoad('collapse-aggregated-sqlstat','aggregated-sqlstat');
collapseTablePageLoad('collapse-aggregated-sqlsessionstat','aggregated-sqlsessionstat');
+ collapseTablePageLoad('collapse-aggregated-activeQueries','aggregated-activeQueries');
+ collapseTablePageLoad('collapse-aggregated-completedQueries','aggregated-completedQueries');
});
$(function() {
diff --git a/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
new file mode 100644
index 0000000..87ff677
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.{util => ju}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.{Node, Unparsed}
+
+/**
+ * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
+ *
+ * @param timelineDivId the timeline `id` used in the html `div` tag
+ * @param histogramDivId the timeline `id` used in the html `div` tag
+ * @param data the data for the graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ * the graph
+ */
+private[spark] class GraphUIData(
+ timelineDivId: String,
+ histogramDivId: String,
+ data: Seq[(Long, Double)],
+ minX: Long,
+ maxX: Long,
+ minY: Double,
+ maxY: Double,
+ unitY: String,
+ batchInterval: Option[Double] = None) {
+
+ private var dataJavaScriptName: String = _
+
+ def generateDataJs(jsCollector: JsCollector): Unit = {
+ val jsForData = data.map { case (x, y) =>
+ s"""{"x": $x, "y": $y}"""
+ }.mkString("[", ",", "]")
+ dataJavaScriptName = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+ }
+
+ def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
+ jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawTimeline(" +
+ s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
+ s" ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
+ s" '$unitY');")
+ }
+ <div id={timelineDivId}></div>
+ }
+
+ def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
+ val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
+ jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawHistogram(" +
+ s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
+ }
+ <div id={histogramDivId}></div>
+ }
+
+ def generateAreaStackHtmlWithData(
+ jsCollector: JsCollector,
+ values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = {
+ val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+ val durationDataPadding = UIUtils.durationDataPadding(values)
+ val jsForData = durationDataPadding.map { case (x, y) =>
+ val s = y.toSeq.sortBy(_._1).map(e => s""""${e._1}": "${e._2}"""").mkString(",")
+ s"""{x: "${UIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}", $s}"""
+ }.mkString("[", ",", "]")
+ val jsForLabels = operationLabels.toSeq.sorted.mkString("[\"", "\",\"", "\"]")
+
+ val (maxX, minX, maxY, minY) = if (values != null && values.length > 0) {
+ val xValues = values.map(_._1.toLong)
+ val yValues = values.map(_._2.asScala.toSeq.map(_._2.toLong).sum)
+ (xValues.max, xValues.min, yValues.max, yValues.min)
+ } else {
+ (0L, 0L, 0L, 0L)
+ }
+
+ dataJavaScriptName = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+ val labels = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $labels = $jsForLabels;")
+ jsCollector.addStatement(
+ s"drawAreaStack('#$timelineDivId', $labels, $dataJavaScriptName, $minX, $maxX, $minY, $maxY)")
+ <div id={timelineDivId}></div>
+ }
+}
+
+/**
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[spark] class JsCollector {
+
+ private var variableId = 0
+
+ /**
+ * Return the next unused JavaScript variable name
+ */
+ def nextVariableName: String = {
+ variableId += 1
+ "v" + variableId
+ }
+
+ /**
+ * JavaScript statements that will execute before `statements`
+ */
+ private val preparedStatements = ArrayBuffer[String]()
+
+ /**
+ * JavaScript statements that will execute after `preparedStatements`
+ */
+ private val statements = ArrayBuffer[String]()
+
+ def addPreparedStatement(js: String): Unit = {
+ preparedStatements += js
+ }
+
+ def addStatement(js: String): Unit = {
+ statements += js
+ }
+
+ /**
+ * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+ */
+ def toHtml: Seq[Node] = {
+ val js =
+ s"""
+ |$$(document).ready(function() {
+ | ${preparedStatements.mkString("\n")}
+ | ${statements.mkString("\n")}
+ |});""".stripMargin
+
+ <script>{Unparsed(js)}</script>
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 143303d..94c4521 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui
+import java.{util => ju}
+import java.lang.{Long => JLong}
import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.text.SimpleDateFormat
@@ -24,6 +26,7 @@ import java.util.{Date, Locale, TimeZone}
import javax.servlet.http.HttpServletRequest
import javax.ws.rs.core.{MediaType, Response}
+import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.xml._
import scala.xml.transform.{RewriteRule, RuleTransformer}
@@ -119,6 +122,59 @@ private[spark] object UIUtils extends Logging {
}
}
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US)
+ }
+
+ private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US)
+ }
+
+ /**
+ * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
+ * format `batchTime` without milliseconds.
+ *
+ * @param batchTime the batch time to be formatted
+ * @param batchInterval the batch interval
+ * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
+ * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
+ * @param timezone only for test
+ */
+ def formatBatchTime(
+ batchTime: Long,
+ batchInterval: Long,
+ showYYYYMMSS: Boolean = true,
+ timezone: TimeZone = null): String = {
+ val oldTimezones =
+ (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(timezone)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
+ }
+ try {
+ val formattedBatchTime =
+ if (batchInterval < 1000) {
+ batchTimeFormatWithMilliseconds.get.format(batchTime)
+ } else {
+ // If batchInterval >= 1 second, don't show milliseconds
+ batchTimeFormat.get.format(batchTime)
+ }
+ if (showYYYYMMSS) {
+ formattedBatchTime
+ } else {
+ formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
+ }
+ } finally {
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(oldTimezones._1)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
+ }
+ }
+ }
+
/** Generate a human-readable string representing a number (e.g. 100 K) */
def formatNumber(records: Double): String = {
val trillion = 1e12
@@ -572,4 +628,39 @@ private[spark] object UIUtils extends Logging {
def buildErrorResponse(status: Response.Status, msg: String): Response = {
Response.status(status).entity(msg).`type`(MediaType.TEXT_PLAIN).build()
}
+
+ /**
+ * There may be different duration labels in each batch. So we need to
+ * mark those missing duration label as '0d' to avoid UI rending error.
+ */
+ def durationDataPadding(
+ values: Array[(Long, ju.Map[String, JLong])]): Array[(Long, Map[String, Double])] = {
+ val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+ values.map { case (xValue, yValue) =>
+ val dataPadding = operationLabels.map { opLabel =>
+ if (yValue.containsKey(opLabel)) {
+ (opLabel, yValue.get(opLabel).toDouble)
+ } else {
+ (opLabel, 0d)
+ }
+ }
+ (xValue, dataPadding.toMap)
+ }
+ }
+
+ def detailsUINode(isMultiline: Boolean, message: String): Seq[Node] = {
+ if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{message}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ Seq.empty[Node]
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 4dc5349..ccaa70b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -721,19 +721,7 @@ private[ui] class TaskPagedTable(
} else {
error
})
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>{error}</pre>
- </div>
- // scalastyle:on
- } else {
- ""
- }
+ val details = UIUtils.detailsUINode(isMultiline, error)
<td>{errorSummary}{details}</td>
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ac431c9..a7d38e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -309,19 +309,7 @@ private[ui] class StagePagedTable(
} else {
failureReason
})
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>{failureReason}</pre>
- </div>
- // scalastyle:on
- } else {
- ""
- }
+ val details = UIUtils.detailsUINode(isMultiline, failureReason)
<td valign="middle">{failureReasonSummary}{details}</td>
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 68e9313..65ffa22 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -489,7 +489,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegressionModel.setPredictionCol"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setFeaturesCol"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setLabelCol"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol")
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol"),
+
+ // [SPARK-29543][SS][UI] Init structured streaming ui
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.this")
)
// Exclude rules for 2.4.x
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0e0a814..e13d65b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,6 +1150,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STREAMING_UI_ENABLED =
+ buildConf("spark.sql.streaming.ui.enabled")
+ .doc("Whether to run the structured streaming UI for the Spark application.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STREAMING_UI_INACTIVE_QUERY_RETENTION =
+ buildConf("spark.sql.streaming.ui.numInactiveQueries")
+ .doc("The number of inactive queries to retain for structured streaming ui.")
+ .intConf
+ .createWithDefault(100)
+
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
@@ -2262,6 +2274,10 @@ class SQLConf extends Serializable with Logging {
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
+ def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
+
+ def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
+
def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 71bcd53..f20291e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -148,8 +148,8 @@ trait ProgressReporter extends Logging {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
val executionStats = extractExecutionStats(hasNewData)
- val processingTimeSec = Math.max(1L,
- currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
+ val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+ val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
(currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
@@ -181,6 +181,7 @@ trait ProgressReporter extends Logging {
name = name,
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
+ batchDuration = processingTimeMills,
durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava),
eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
stateOperators = executionStats.stateOperators.toArray,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6dff5c6..ed908a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,7 +307,8 @@ abstract class StreamExecution(
}
// `postEvent` does not throw non fatal exception.
- postEvent(new QueryStartedEvent(id, runId, name))
+ val submissionTime = triggerClock.getTimeMillis()
+ postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
// Unblock starting thread
startLatch.countDown()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index de3805e..fefd72d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.internal
import java.net.URL
-import java.util.{Locale, UUID}
+import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy
@@ -36,6 +36,8 @@ import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.streaming.StreamingQueryListener
+import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
@@ -139,6 +141,21 @@ private[sql] class SharedState(
}
/**
+ * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
+ * data to show.
+ */
+ lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
+ val sqlConf = SQLConf.get
+ if (sqlConf.isStreamingUIEnabled) {
+ val statusListener = new StreamingQueryStatusListener(sqlConf)
+ sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
+ Some(statusListener)
+ } else {
+ None
+ }
+ }
+
+ /**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index cc81cf6..dd842cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,13 +82,15 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
+ * @param submissionTime The timestamp to start a query.
* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
- val name: String) extends Event
+ val name: String,
+ val submissionTime: Long) extends Event
/**
* Event representing any progress updates in a query.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 810f4a1..4d0d8ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
@@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -68,6 +69,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
logInfo(s"Registered listener ${listener.getClass.getName}")
})
}
+ sparkSession.sharedState.streamingQueryStatusListener.foreach { listener =>
+ addListener(listener)
+ }
} catch {
case e: Exception =>
throw new SparkException("Exception when registering StreamingQueryListener", e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index a9681db..13b506b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -85,6 +85,7 @@ class StateOperatorProgress private[sql](
* case of retries after a failure a given batchId my be executed more than once.
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
+ * @param batchDuration The process duration of each batch.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
* {{{
@@ -105,6 +106,7 @@ class StreamingQueryProgress private[sql](
val name: String,
val timestamp: String,
val batchId: Long,
+ val batchDuration: Long,
val durationMs: ju.Map[String, JLong],
val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
new file mode 100644
index 0000000..650f64f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.streaming.ui.UIUtils._
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
+
+private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
+ extends WebUIPage("") with Logging {
+ val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ df.setTimeZone(getTimeZone("UTC"))
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val content = generateStreamingQueryTable(request)
+ SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent)
+ }
+
+ def generateDataRow(request: HttpServletRequest, queryActive: Boolean)
+ (query: StreamingQueryUIData): Seq[Node] = {
+
+ def details(detail: Any): Seq[Node] = {
+ if (queryActive) {
+ return Seq.empty[Node]
+ }
+ val detailString = detail.asInstanceOf[String]
+ val isMultiline = detailString.indexOf('\n') >= 0
+ val summary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) detailString.substring(0, detailString.indexOf('\n')) else detailString
+ )
+ val details = SparkUIUtils.detailsUINode(isMultiline, detailString)
+ <td>{summary}{details}</td>
+ }
+
+ val statisticsLink = "%s/%s/statistics?id=%s"
+ .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId)
+
+ val name = UIUtils.getQueryName(query)
+ val status = UIUtils.getQueryStatus(query)
+ val duration = if (queryActive) {
+ SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
+ } else {
+ withNoProgress(query, {
+ val endTimeMs = query.lastProgress.timestamp
+ SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime)
+ }, "-")
+ }
+
+ <tr>
+ <td> {name} </td>
+ <td> {status} </td>
+ <td> {query.id} </td>
+ <td> <a href={statisticsLink}> {query.runId} </a> </td>
+ <td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
+ <td> {duration} </td>
+ <td> {withNoProgress(query, {
+ (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
+ query.recentProgress.length).formatted("%.2f") }, "NaN")}
+ </td>
+ <td> {withNoProgress(query, {
+ (query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum /
+ query.recentProgress.length).formatted("%.2f") }, "NaN")}
+ </td>
+ <td> {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} </td>
+ {details(query.exception.getOrElse("-"))}
+ </tr>
+ }
+
+ private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
+ val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
+ .partition(_.isActive)
+ val activeQueryTables = if (activeQueries.nonEmpty) {
+ val headerRow = Seq(
+ "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
+ "Avg Process /sec", "Lastest Batch")
+
+ Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
+ activeQueries, true, None, Seq(null), false))
+ } else {
+ None
+ }
+
+ val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
+ val headerRow = Seq(
+ "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
+ "Avg Process /sec", "Lastest Batch", "Error")
+
+ Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
+ inactiveQueries, true, None, Seq(null), false))
+ } else {
+ None
+ }
+
+ // scalastyle:off
+ val content =
+ <span id="completed" class="collapse-aggregated-activeQueries collapse-table"
+ onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
+ <h5 id="activequeries">
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Active Streaming Queries ({activeQueries.length})</a>
+ </h5>
+ </span> ++
+ <div>
+ <ul class="aggregated-activeQueries collapsible-table">
+ {activeQueryTables.getOrElse(Seq.empty[Node])}
+ </ul>
+ </div> ++
+ <span id="completed" class="collapse-aggregated-completedQueries collapse-table"
+ onClick="collapseTable('collapse-aggregated-completedQueries','aggregated-completedQueries')">
+ <h5 id="completedqueries">
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Completed Streaming Queries ({inactiveQueries.length})</a>
+ </h5>
+ </span> ++
+ <div>
+ <ul class="aggregated-completedQueries collapsible-table">
+ {inactiveQueryTables.getOrElse(Seq.empty[Node])}
+ </ul>
+ </div>
+ // scalastyle:on
+
+ content
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
new file mode 100644
index 0000000..56672ce
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.text.SimpleDateFormat
+import java.util.UUID
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Unparsed}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.streaming.ui.UIUtils._
+import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
+
+private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
+ extends WebUIPage("statistics") with Logging {
+ val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ df.setTimeZone(getTimeZone("UTC"))
+
+ def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
+ // scalastyle:off
+ <script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
+ <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/>
+ <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script>
+ <script src={SparkUIUtils.prependBaseUri(request, "/static/structured-streaming-page.js")}></script>
+ // scalastyle:on
+ }
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val query = parent.statusListener.allQueryStatus.find { case q =>
+ q.runId.equals(UUID.fromString(parameterId))
+ }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
+
+ val resources = generateLoadResources(request)
+ val basicInfo = generateBasicInfo(query)
+ val content =
+ resources ++
+ basicInfo ++
+ generateStatTable(query)
+ SparkUIUtils.headerSparkPage(request, "Streaming Query Statistics", content, parent)
+ }
+
+ def generateTimeMap(times: Seq[Long]): Seq[Node] = {
+ val js = "var timeFormat = {};\n" + times.map { time =>
+ val formattedTime = SparkUIUtils.formatBatchTime(time, 1, showYYYYMMSS = false)
+ s"timeFormat[$time] = '$formattedTime';"
+ }.mkString("\n")
+
+ <script>{Unparsed(js)}</script>
+ }
+
+ def generateVar(values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = {
+ val durationDataPadding = SparkUIUtils.durationDataPadding(values)
+ val js = "var timeToValues = {};\n" + durationDataPadding.map { case (x, y) =>
+ val s = y.toSeq.sortBy(_._1).map(e => s""""${e._2}"""").mkString("[", ",", "]")
+ s"""timeToValues["${SparkUIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}"] = $s;"""
+ }.mkString("\n")
+
+ <script>{Unparsed(js)}</script>
+ }
+
+ def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
+ val duration = if (query.isActive) {
+ SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
+ } else {
+ withNoProgress(query, {
+ val end = query.lastProgress.timestamp
+ val start = query.recentProgress.head.timestamp
+ SparkUIUtils.formatDurationVerbose(
+ df.parse(end).getTime - df.parse(start).getTime)
+ }, "-")
+ }
+
+ val name = UIUtils.getQueryName(query)
+ val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
+ <div>Running batches for
+ <strong>
+ {duration}
+ </strong>
+ since
+ <strong>
+ {SparkUIUtils.formatDate(query.submissionTime)}
+ </strong>
+ (<strong>{numBatches}</strong> completed batches)
+ </div>
+ <br />
+ <div><strong>Name: </strong>{name}</div>
+ <div><strong>Id: </strong>{query.id}</div>
+ <div><strong>RunId: </strong>{query.runId}</div>
+ <br />
+ }
+
+ def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
+ val batchTimes = withNoProgress(query,
+ query.recentProgress.map(p => df.parse(p.timestamp).getTime), Array.empty[Long])
+ val minBatchTime =
+ withNoProgress(query, df.parse(query.recentProgress.head.timestamp).getTime, 0L)
+ val maxBatchTime =
+ withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L)
+ val maxRecordRate =
+ withNoProgress(query, query.recentProgress.map(_.inputRowsPerSecond).max, 0L)
+ val minRecordRate = 0L
+ val maxProcessRate =
+ withNoProgress(query, query.recentProgress.map(_.processedRowsPerSecond).max, 0L)
+
+ val minProcessRate = 0L
+ val maxRows = withNoProgress(query, query.recentProgress.map(_.numInputRows).max, 0L)
+ val minRows = 0L
+ val maxBatchDuration = withNoProgress(query, query.recentProgress.map(_.batchDuration).max, 0L)
+ val minBatchDuration = 0L
+
+ val inputRateData = withNoProgress(query,
+ query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long, Double)])
+ val processRateData = withNoProgress(query,
+ query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, Double)])
+ val inputRowsData = withNoProgress(query,
+ query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
+ val batchDurations = withNoProgress(query,
+ query.recentProgress.map(p => (df.parse(p.timestamp).getTime,
+ withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
+ val operationDurationData = withNoProgress(query, query.recentProgress.map { p =>
+ val durationMs = p.durationMs
+ // remove "triggerExecution" as it count the other operation duration.
+ durationMs.remove("triggerExecution")
+ (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long, ju.Map[String, JLong])])
+
+ val jsCollector = new JsCollector
+ val graphUIDataForInputRate =
+ new GraphUIData(
+ "input-rate-timeline",
+ "input-rate-histogram",
+ inputRateData,
+ minBatchTime,
+ maxBatchTime,
+ minRecordRate,
+ maxRecordRate,
+ "records/sec")
+ graphUIDataForInputRate.generateDataJs(jsCollector)
+
+ val graphUIDataForProcessRate =
+ new GraphUIData(
+ "process-rate-timeline",
+ "process-rate-histogram",
+ processRateData,
+ minBatchTime,
+ maxBatchTime,
+ minProcessRate,
+ maxProcessRate,
+ "records/sec")
+ graphUIDataForProcessRate.generateDataJs(jsCollector)
+
+ val graphUIDataForInputRows =
+ new GraphUIData(
+ "input-rows-timeline",
+ "input-rows-histogram",
+ inputRowsData,
+ minBatchTime,
+ maxBatchTime,
+ minRows,
+ maxRows,
+ "records")
+ graphUIDataForInputRows.generateDataJs(jsCollector)
+
+ val graphUIDataForBatchDuration =
+ new GraphUIData(
+ "batch-duration-timeline",
+ "batch-duration-histogram",
+ batchDurations,
+ minBatchTime,
+ maxBatchTime,
+ minBatchDuration,
+ maxBatchDuration,
+ "ms")
+ graphUIDataForBatchDuration.generateDataJs(jsCollector)
+
+ val graphUIDataForDuration =
+ new GraphUIData(
+ "duration-area-stack",
+ "",
+ Seq.empty[(Long, Double)],
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ "ms")
+
+ val table =
+ // scalastyle:off
+ <table id="stat-table" class="table table-bordered" style="width: auto">
+ <thead>
+ <tr>
+ <th style="width: 160px;"></th>
+ <th style="width: 492px;">Timelines</th>
+ <th style="width: 350px;">Histograms</th></tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Input Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate of data arriving.", "right")}</strong></div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForInputRate.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForInputRate.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Process Rate {SparkUIUtils.tooltip("The aggregate (across all sources) rate at which Spark is processing data.", "right")}</strong></div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForProcessRate.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForProcessRate.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Input Rows {SparkUIUtils.tooltip("The aggregate (across all sources) number of records processed in a trigger.", "right")}</strong></div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForInputRows.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForInputRows.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: 160px;">
+ <div><strong>Batch Duration {SparkUIUtils.tooltip("The process duration of each batch.", "right")}</strong></div>
+ </div>
+ </td>
+ <td class="timeline">{graphUIDataForBatchDuration.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForBatchDuration.generateHistogramHtml(jsCollector)}</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">
+ <div style="width: auto;">
+ <div><strong>Operation Duration {SparkUIUtils.tooltip("The amount of time taken to perform various operations in milliseconds.", "right")}</strong></div>
+ </div>
+ </td>
+ <td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
+ </tr>
+ </tbody>
+ </table>
+ // scalastyle:on
+
+ generateVar(operationDurationData) ++ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
new file mode 100644
index 0000000..db085db
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
+
+/**
+ * A customized StreamingQueryListener used in structured streaming UI, which contains all
+ * UI data for both active and inactive query.
+ * TODO: Add support for history server.
+ */
+private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
+
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+ /**
+ * We use runId as the key here instead of id in active query status map,
+ * because the runId is unique for every started query, even it its a restart.
+ */
+ private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
+ private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
+
+ private val streamingProgressRetention = sqlConf.streamingProgressRetention
+ private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
+
+ override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+ activeQueryStatus.putIfAbsent(event.runId,
+ new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime))
+ }
+
+ override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+ val batchTimestamp = timestampFormat.parse(event.progress.timestamp).getTime
+ val queryStatus = activeQueryStatus.getOrDefault(
+ event.progress.runId,
+ new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
+ batchTimestamp))
+ queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ }
+
+ override def onQueryTerminated(
+ event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+ val queryStatus = activeQueryStatus.remove(event.runId)
+ if (queryStatus != null) {
+ queryStatus.queryTerminated(event)
+ inactiveQueryStatus += queryStatus
+ while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
+ inactiveQueryStatus.dequeue()
+ }
+ }
+ }
+
+ def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
+ activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+ }
+}
+
+/**
+ * This class contains all message related to UI display, each instance corresponds to a single
+ * [[org.apache.spark.sql.streaming.StreamingQuery]].
+ */
+private[ui] class StreamingQueryUIData(
+ val name: String,
+ val id: UUID,
+ val runId: UUID,
+ val submissionTime: Long) {
+
+ /** Holds the most recent query progress updates. */
+ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+ private var _isActive = true
+ private var _exception: Option[String] = None
+
+ def isActive: Boolean = synchronized { _isActive }
+
+ def exception: Option[String] = synchronized { _exception }
+
+ def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+ _isActive = false
+ _exception = event.exception
+ }
+
+ def updateProcess(
+ newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
+ progressBuffer += newProgress
+ while (progressBuffer.length >= retentionNum) {
+ progressBuffer.dequeue()
+ }
+ }
+
+ def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+ progressBuffer.toArray
+ }
+
+ def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+ progressBuffer.lastOption.orNull
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
similarity index 54%
copy from streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
copy to sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index 3ecc448..f909cfd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -14,35 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.spark.streaming.ui
+package org.apache.spark.sql.streaming.ui
import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.{SparkUI, SparkUITab}
-/**
- * Spark Web UI tab that shows statistics of a streaming job.
- * This assumes the given SparkContext has enabled its SparkUI.
- */
-private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI)
- extends SparkUITab(sparkUI, "streaming") with Logging {
+private[sql] class StreamingQueryTab(
+ val statusListener: StreamingQueryStatusListener,
+ sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
- private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
+ override val name = "Structured Streaming"
val parent = sparkUI
- val listener = ssc.progressListener
- attachPage(new StreamingPage(this))
- attachPage(new BatchPage(this))
+ attachPage(new StreamingQueryPage(this))
+ attachPage(new StreamingQueryStatisticsPage(this))
+ parent.attachTab(this)
- def attach(): Unit = {
- parent.attachTab(this)
- parent.addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming")
- }
+ parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
+}
- def detach(): Unit = {
- parent.detachTab(this)
- parent.detachHandler("/static/streaming")
- }
+object StreamingQueryTab {
+ private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000..57b9dec
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+private[ui] object UIUtils {
+
+ /**
+ * Check whether `number` is valid, if not return 0.0d
+ */
+ def withNumberInvalid(number: => Double): Double = {
+ if (number.isNaN || number.isInfinite) {
+ 0.0d
+ } else {
+ number
+ }
+ }
+
+ /**
+ * Execute a block of code when there is already one completed batch in streaming query,
+ * otherwise return `default` value.
+ */
+ def withNoProgress[T](query: StreamingQueryUIData, body: => T, default: T): T = {
+ if (query.lastProgress != null) {
+ body
+ } else {
+ default
+ }
+ }
+
+ def getQueryName(query: StreamingQueryUIData): String = {
+ if (query.name == null || query.name.isEmpty) {
+ "<no name>"
+ } else {
+ query.name
+ }
+ }
+
+ def getQueryStatus(query: StreamingQueryUIData): String = {
+ if (query.isActive) {
+ "RUNNING"
+ } else {
+ query.exception.map(_ => "FAILED").getOrElse("FINISHED")
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 2f66dd32..9d0f829 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.Waiters.Waiter
@@ -34,6 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.util.JsonProtocol
@@ -47,7 +47,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
after {
spark.streams.active.foreach(_.stop())
assert(spark.streams.active.isEmpty)
- assert(spark.streams.listListeners().isEmpty)
+ // Skip check default `StreamingQueryStatusListener` which is for streaming UI.
+ assert(spark.streams.listListeners()
+ .filterNot(_.isInstanceOf[StreamingQueryStatusListener]).isEmpty)
// Make sure we don't leak any events to the next test
spark.sparkContext.listenerBus.waitUntilEmpty()
}
@@ -252,8 +254,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(newEvent.name === event.name)
}
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name"))
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null))
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 1L))
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 1L))
}
test("QueryProgressEvent serialization") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index b6a6be2..6f00b52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -312,6 +312,7 @@ object StreamingQueryStatusAndProgressSuite {
name = "myName",
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
+ batchDuration = 0L,
durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
eventTime = new java.util.HashMap(Map(
"max" -> "2016-12-05T20:54:20.827Z",
@@ -346,6 +347,7 @@ object StreamingQueryStatusAndProgressSuite {
name = null, // should not be present in the json
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
+ batchDuration = 0L,
durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4121f49..77f5c85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -466,7 +466,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
- val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value"))
+ val progress = getStreamingQuery(streamingInputDF.join(streamingInputDF, "value"))
+ .recentProgress.head
assert(progress.numInputRows === 20) // data is read multiple times in self-joins
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 20)
@@ -479,7 +480,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Trigger input has 10 rows, static input has 2 rows,
// therefore after the first trigger, the calculated input rows should be 10
- val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+ val progress = getStreamingQuery(streamingInputDF.join(staticInputDF, "value"))
+ .recentProgress.head
assert(progress.numInputRows === 10)
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 10)
@@ -492,7 +494,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
// After the first trigger, the calculated input rows should be 10
- val progress = getFirstProgress(streamingInputDF)
+ val progress = getStreamingQuery(streamingInputDF).recentProgress.head
assert(progress.numInputRows === 10)
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 10)
@@ -1120,12 +1122,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
StreamingExecutionRelation(source, spark)
}
- /** Returns the query progress at the end of the first trigger of streaming DF */
- private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
+ /** Returns the query at the end of the first trigger of streaming DF */
+ private def getStreamingQuery(streamingDF: DataFrame): StreamingQuery = {
try {
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
- q.recentProgress.head
+ q
} finally {
spark.streams.active.map(_.stop())
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
new file mode 100644
index 0000000..de43e47
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.{Locale, UUID}
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.BeforeAndAfter
+import scala.xml.Node
+
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+import org.apache.spark.sql.test.SharedSparkSession
+
+class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
+
+ test("correctly display streaming query page") {
+ val id = UUID.randomUUID()
+ val request = mock(classOf[HttpServletRequest])
+ val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+ val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ when(tab.statusListener).thenReturn(statusListener)
+
+ val streamQuery = createStreamQueryUIData(id)
+ when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ var html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (1)"))
+ assert(html.contains("completed streaming queries (0)"))
+
+ when(streamQuery.isActive).thenReturn(false)
+ when(streamQuery.exception).thenReturn(None)
+ html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (0)"))
+ assert(html.contains("completed streaming queries (1)"))
+ assert(html.contains("finished"))
+
+ when(streamQuery.isActive).thenReturn(false)
+ when(streamQuery.exception).thenReturn(Option("exception in query"))
+ html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (0)"))
+ assert(html.contains("completed streaming queries (1)"))
+ assert(html.contains("failed"))
+ assert(html.contains("exception in query"))
+ }
+
+ test("correctly display streaming query statistics page") {
+ val id = UUID.randomUUID()
+ val request = mock(classOf[HttpServletRequest])
+ val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+ val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ when(request.getParameter("id")).thenReturn(id.toString)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ when(tab.statusListener).thenReturn(statusListener)
+
+ val streamQuery = createStreamQueryUIData(id)
+ when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ val html = renderStreamingQueryStatisticsPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+
+ assert(html.contains("<strong>name: </strong>query<"))
+ assert(html.contains("""{"x": 1001898000100, "y": 10.0}"""))
+ assert(html.contains("""{"x": 1001898000100, "y": 12.0}"""))
+ assert(html.contains("(<strong>3</strong> completed batches)"))
+ }
+
+ private def createStreamQueryUIData(id: UUID): StreamingQueryUIData = {
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+ when(progress.inputRowsPerSecond).thenReturn(10.0)
+ when(progress.processedRowsPerSecond).thenReturn(12.0)
+ when(progress.batchId).thenReturn(2)
+ when(progress.prettyJson).thenReturn("""{"a":1}""")
+
+ val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ when(streamQuery.isActive).thenReturn(true)
+ when(streamQuery.name).thenReturn("query")
+ when(streamQuery.id).thenReturn(id)
+ when(streamQuery.runId).thenReturn(id)
+ when(streamQuery.submissionTime).thenReturn(1L)
+ when(streamQuery.lastProgress).thenReturn(progress)
+ when(streamQuery.recentProgress).thenReturn(Array(progress))
+ when(streamQuery.exception).thenReturn(None)
+
+ streamQuery
+ }
+
+ /**
+ * Render a stage page started with the given conf and return the HTML.
+ * This also runs a dummy execution page to populate the page with useful content.
+ */
+ private def renderStreamingQueryPage(
+ request: HttpServletRequest,
+ tab: StreamingQueryTab): Seq[Node] = {
+ val page = new StreamingQueryPage(tab)
+ page.render(request)
+ }
+
+ private def renderStreamingQueryStatisticsPage(
+ request: HttpServletRequest,
+ tab: StreamingQueryTab): Seq[Node] = {
+ val page = new StreamingQueryStatisticsPage(tab)
+ page.render(request)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
new file mode 100644
index 0000000..bd74ed3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.UUID
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
+import org.apache.spark.sql.streaming
+
+class StreamingQueryStatusListenerSuite extends StreamTest {
+
+ test("onQueryStarted, onQueryProgress, onQueryTerminated") {
+ val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+ // hanlde query started event
+ val id = UUID.randomUUID()
+ val runId = UUID.randomUUID()
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, "test", 1L)
+ listener.onQueryStarted(startEvent)
+
+ // result checking
+ assert(listener.activeQueryStatus.size() == 1)
+ assert(listener.activeQueryStatus.get(runId).name == "test")
+
+ // handle query progress event
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(progress.id).thenReturn(id)
+ when(progress.runId).thenReturn(runId)
+ when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+ when(progress.inputRowsPerSecond).thenReturn(10.0)
+ when(progress.processedRowsPerSecond).thenReturn(12.0)
+ when(progress.batchId).thenReturn(2)
+ when(progress.prettyJson).thenReturn("""{"a":1}""")
+ val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
+ listener.onQueryProgress(processEvent)
+
+ // result checking
+ val activeQuery = listener.activeQueryStatus.get(runId)
+ assert(activeQuery.isActive)
+ assert(activeQuery.recentProgress.length == 1)
+ assert(activeQuery.lastProgress.id == id)
+ assert(activeQuery.lastProgress.runId == runId)
+ assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
+ assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
+ assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
+ assert(activeQuery.lastProgress.batchId == 2)
+ assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
+
+ // handle terminate event
+ val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
+ listener.onQueryTerminated(terminateEvent)
+
+ assert(!listener.inactiveQueryStatus.head.isActive)
+ assert(listener.inactiveQueryStatus.head.runId == runId)
+ assert(listener.inactiveQueryStatus.head.id == id)
+ }
+
+ test("same query start multiple times") {
+ val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+ // handle first time start
+ val id = UUID.randomUUID()
+ val runId0 = UUID.randomUUID()
+ val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, "test", 1L)
+ listener.onQueryStarted(startEvent0)
+
+ // handle terminate event
+ val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None)
+ listener.onQueryTerminated(terminateEvent0)
+
+ // handle second time start
+ val runId1 = UUID.randomUUID()
+ val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, "test", 1L)
+ listener.onQueryStarted(startEvent1)
+
+ // result checking
+ assert(listener.activeQueryStatus.size() == 1)
+ assert(listener.inactiveQueryStatus.length == 1)
+ assert(listener.activeQueryStatus.containsKey(runId1))
+ assert(listener.activeQueryStatus.get(runId1).id == id)
+ assert(listener.inactiveQueryStatus.head.runId == runId0)
+ assert(listener.inactiveQueryStatus.head.id == id)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000..46f2ead
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+
+class UIUtilsSuite extends SparkFunSuite with Matchers {
+ test("streaming query started with no batch completed") {
+ val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ when(query.lastProgress).thenReturn(null)
+
+ assert(0 == UIUtils.withNoProgress(query, 1, 0))
+ }
+
+ test("streaming query started with at least one batch completed") {
+ val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(query.lastProgress).thenReturn(progress)
+
+ assert(1 == UIUtils.withNoProgress(query, 1, 0))
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index adfda0c5..890a668 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -375,21 +375,7 @@ private[ui] class SqlStatsPagedTable(
} else {
errorMessage
})
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- + details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>
- {errorMessage}
- </pre>
- </div>
- // scalastyle:on
- } else {
- ""
- }
+ val details = detailsUINode(isMultiline, errorMessage)
<td>
{errorSummary}{details}
</td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index e360b4a..6c981b2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -33,7 +33,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
import org.apache.spark.util.{CallSite, Utils}
/**
@@ -138,7 +138,7 @@ abstract class DStream[T: ClassTag] (
*/
private def makeScope(time: Time): Option[RDDOperationScope] = {
baseScope.map { bsJson =>
- val formattedBatchTime = UIUtils.formatBatchTime(
+ val formattedBatchTime = SparkUIUtils.formatBatchTime(
time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val bs = RDDOperationScope.fromJson(bsJson)
val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 5d543c5..7eea57c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
@@ -230,7 +230,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get()))
- val formattedTime = UIUtils.formatBatchTime(
+ val formattedTime = SparkUIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f1070e9..b5a0e92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -51,7 +51,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
- val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
+ val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2c85d26..04cd063 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -325,7 +325,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime =
- UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
+ SparkUIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 31ebb4c..d47287b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,78 +20,10 @@ package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
-import scala.collection.mutable.ArrayBuffer
import scala.xml.{Node, Unparsed}
import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
-
-/**
- * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
- *
- * @param timelineDivId the timeline `id` used in the html `div` tag
- * @param histogramDivId the timeline `id` used in the html `div` tag
- * @param data the data for the graph
- * @param minX the min value of X axis
- * @param maxX the max value of X axis
- * @param minY the min value of Y axis
- * @param maxY the max value of Y axis
- * @param unitY the unit of Y axis
- * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
- * the graph
- */
-private[ui] class GraphUIData(
- timelineDivId: String,
- histogramDivId: String,
- data: Seq[(Long, Double)],
- minX: Long,
- maxX: Long,
- minY: Double,
- maxY: Double,
- unitY: String,
- batchInterval: Option[Double] = None) {
-
- private var dataJavaScriptName: String = _
-
- def generateDataJs(jsCollector: JsCollector): Unit = {
- val jsForData = data.map { case (x, y) =>
- s"""{"x": $x, "y": $y}"""
- }.mkString("[", ",", "]")
- dataJavaScriptName = jsCollector.nextVariableName
- jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
- }
-
- def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
- jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
- if (batchInterval.isDefined) {
- jsCollector.addStatement(
- "drawTimeline(" +
- s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
- s" ${batchInterval.get}" +
- ");")
- } else {
- jsCollector.addStatement(
- s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
- s" '$unitY');")
- }
- <div id={timelineDivId}></div>
- }
-
- def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
- val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
- jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
- if (batchInterval.isDefined) {
- jsCollector.addStatement(
- "drawHistogram(" +
- s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
- ");")
- } else {
- jsCollector.addStatement(
- s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
- }
- <div id={histogramDivId}></div>
- }
-}
+import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
/**
* A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -164,8 +96,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
private def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
<script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
- <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.css")} type="text/css"/>
- <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming/streaming-page.js")}></script>
+ <link rel="stylesheet" href={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.css")} type="text/css"/>
+ <script src={SparkUIUtils.prependBaseUri(request, "/static/streaming-page.js")}></script>
// scalastyle:on
}
@@ -201,7 +133,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
val js = "var timeFormat = {};\n" + times.map { time =>
val formattedTime =
- UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
+ SparkUIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
s"timeFormat[$time] = '$formattedTime';"
}.mkString("\n")
@@ -544,52 +476,3 @@ private[ui] object StreamingPage {
}
-/**
- * A helper class that allows the user to add JavaScript statements which will be executed when the
- * DOM has finished loading.
- */
-private[ui] class JsCollector {
-
- private var variableId = 0
-
- /**
- * Return the next unused JavaScript variable name
- */
- def nextVariableName: String = {
- variableId += 1
- "v" + variableId
- }
-
- /**
- * JavaScript statements that will execute before `statements`
- */
- private val preparedStatements = ArrayBuffer[String]()
-
- /**
- * JavaScript statements that will execute after `preparedStatements`
- */
- private val statements = ArrayBuffer[String]()
-
- def addPreparedStatement(js: String): Unit = {
- preparedStatements += js
- }
-
- def addStatement(js: String): Unit = {
- statements += js
- }
-
- /**
- * Generate a html snippet that will execute all scripts when the DOM has finished loading.
- */
- def toHtml: Seq[Node] = {
- val js =
- s"""
- |$$(document).ready(function() {
- | ${preparedStatements.mkString("\n")}
- | ${statements.mkString("\n")}
- |});""".stripMargin
-
- <script>{Unparsed(js)}</script>
- }
-}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 3ecc448..d616b47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -28,7 +28,7 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI)
extends SparkUITab(sparkUI, "streaming") with Logging {
- private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
+ private val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
val parent = sparkUI
val listener = ssc.progressListener
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index c21912a..dc1af0a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,14 +17,14 @@
package org.apache.spark.streaming.ui
-import java.text.SimpleDateFormat
-import java.util.{Locale, TimeZone}
import java.util.concurrent.TimeUnit
import scala.xml.Node
import org.apache.commons.text.StringEscapeUtils
+import org.apache.spark.ui.{ UIUtils => SparkUIUtils }
+
private[streaming] object UIUtils {
/**
@@ -78,59 +78,6 @@ private[streaming] object UIUtils {
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
- // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
- private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue(): SimpleDateFormat =
- new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US)
- }
-
- private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue(): SimpleDateFormat =
- new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US)
- }
-
- /**
- * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
- * format `batchTime` without milliseconds.
- *
- * @param batchTime the batch time to be formatted
- * @param batchInterval the batch interval
- * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
- * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
- * @param timezone only for test
- */
- def formatBatchTime(
- batchTime: Long,
- batchInterval: Long,
- showYYYYMMSS: Boolean = true,
- timezone: TimeZone = null): String = {
- val oldTimezones =
- (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
- if (timezone != null) {
- batchTimeFormat.get.setTimeZone(timezone)
- batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
- }
- try {
- val formattedBatchTime =
- if (batchInterval < 1000) {
- batchTimeFormatWithMilliseconds.get.format(batchTime)
- } else {
- // If batchInterval >= 1 second, don't show milliseconds
- batchTimeFormat.get.format(batchTime)
- }
- if (showYYYYMMSS) {
- formattedBatchTime
- } else {
- formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
- }
- } finally {
- if (timezone != null) {
- batchTimeFormat.get.setTimeZone(oldTimezones._1)
- batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
- }
- }
- }
-
def createOutputOperationFailureForUI(failure: String): String = {
if (failure.startsWith("org.apache.spark.Spark")) {
// SparkException or SparkDriverExecutionException
@@ -164,19 +111,7 @@ private[streaming] object UIUtils {
} else {
failureReason
}
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>{failureDetails}</pre>
- </div>
- // scalastyle:on
- } else {
- ""
- }
+ val details = SparkUIUtils.detailsUINode(isMultiline, failureDetails)
if (rowspan == 1) {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 1bb4116..36036fc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -19,12 +19,10 @@ package org.apache.spark.streaming
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
import org.apache.spark.util.ManualClock
/**
@@ -214,7 +212,7 @@ class DStreamScopeSuite
rddScope: RDDOperationScope,
batchTime: Long): Unit = {
val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
- val formattedBatchTime = UIUtils.formatBatchTime(
+ val formattedBatchTime = SparkUIUtils.formatBatchTime(
batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
assert(rddScope.id === s"${baseScopeId}_$batchTime")
assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
index d3ca2b5..5760837 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import org.scalatest.Matchers
import org.apache.spark.SparkFunSuite
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
class UIUtilsSuite extends SparkFunSuite with Matchers{
@@ -70,10 +71,13 @@ class UIUtilsSuite extends SparkFunSuite with Matchers{
test("formatBatchTime") {
val tzForTest = TimeZone.getTimeZone("America/Los_Angeles")
val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015
- assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
+ assert("2015/05/14 14:04:40" ===
+ SparkUIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
assert("2015/05/14 14:04:40.452" ===
- UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
- assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
- assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
+ SparkUIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
+ assert("14:04:40" ===
+ SparkUIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
+ assert("14:04:40.452" ===
+ SparkUIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org