You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/06/12 15:28:51 UTC
[spark] branch master updated: [SPARK-30119][WEBUI] Support
pagination for streaming tab
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9b098f1 [SPARK-30119][WEBUI] Support pagination for streaming tab
9b098f1 is described below
commit 9b098f1eb91a5e9f488d573bfeea3f6bfd9b95b3
Author: iRakson <ra...@gmail.com>
AuthorDate: Fri Jun 12 10:27:31 2020 -0500
[SPARK-30119][WEBUI] Support pagination for streaming tab
### What changes were proposed in this pull request?
#28747 reverted #28439 due to some flaky test case. This PR fixes the flaky test and adds pagination support.
### Why are the changes needed?
To support pagination for streaming tab
### Does this PR introduce _any_ user-facing change?
Yes, Now streaming tab tables will be paginated.
### How was this patch tested?
Manually.
Closes #28748 from iRakson/fixstreamingpagination.
Authored-by: iRakson <ra...@gmail.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../resources/org/apache/spark/ui/static/webui.js | 3 +-
.../spark/streaming/ui/AllBatchesTable.scala | 282 +++++++++++----------
.../apache/spark/streaming/ui/StreamingPage.scala | 113 ++++++---
.../apache/spark/streaming/UISeleniumSuite.scala | 39 ++-
4 files changed, 259 insertions(+), 178 deletions(-)
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index 4f8409c..bb37256 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -87,7 +87,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
- collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
+ collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
+ collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 1e443f6..c0eec0e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,30 +17,41 @@
package org.apache.spark.streaming.ui
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+import javax.servlet.http.HttpServletRequest
-private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
+import scala.xml.Node
- protected def columns: Seq[Node] = {
- <th>Batch Time</th>
- <th>Records</th>
- <th>Scheduling Delay
- {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
- </th>
- <th>Processing Time
- {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
- }
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
+
+private[ui] class StreamingPagedTable(
+ request: HttpServletRequest,
+ tableTag: String,
+ batches: Seq[BatchUIData],
+ basePath: String,
+ subPath: String,
+ batchInterval: Long) extends PagedTable[BatchUIData] {
+
+ private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
+ private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ private val firstFailureReason: Option[String] =
+ if (!tableTag.equals("waitingBatches")) {
+ getFirstFailureReason(batches)
+ } else {
+ None
+ }
/**
* Return the first failure reason if finding in the batches.
*/
- protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
+ private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}
- protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+ private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
@@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
}.getOrElse(<td>-</td>)
}
- protected def baseRow(batch: BatchUIData): Seq[Node] = {
- val batchTime = batch.batchTime.milliseconds
- val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
- val numRecords = batch.numRecords
- val schedulingDelay = batch.schedulingDelay
- val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
- val processingTime = batch.processingDelay
- val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
- val batchTimeId = s"batch-$batchTime"
-
- <td id={batchTimeId} sorttable_customkey={batchTime.toString}
- isFailed={batch.isFailed.toString}>
- <a href={s"batch?id=$batchTime"}>
- {formattedBatchTime}
- </a>
- </td>
- <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
- <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
- {formattedSchedulingDelay}
- </td>
- <td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
- {formattedProcessingTime}
- </td>
- }
-
- private def batchTable: Seq[Node] = {
- <table id={tableId} class="table table-bordered table-striped table-sm sortable">
- <thead>
- {columns}
- </thead>
- <tbody>
- {renderRows}
- </tbody>
- </table>
- }
-
- def toNodeSeq: Seq[Node] = {
- batchTable
- }
-
- protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+ private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
{
- SparkUIUtils.makeProgressBar(
- started = batch.numActiveOutputOp,
- completed = batch.numCompletedOutputOp,
- failed = batch.numFailedOutputOp,
- skipped = 0,
- reasonToNumKilled = Map.empty,
- total = batch.outputOperations.size)
+ SparkUIUtils.makeProgressBar(
+ started = batch.numActiveOutputOp,
+ completed = batch.numCompletedOutputOp,
+ failed = batch.numFailedOutputOp,
+ skipped = 0,
+ reasonToNumKilled = Map.empty,
+ total = batch.outputOperations.size)
}
</td>
}
- /**
- * Return HTML for all rows of this table.
- */
- protected def renderRows: Seq[Node]
-}
+ override def tableId: String = s"$tableTag-table"
-private[ui] class ActiveBatchTable(
- runningBatches: Seq[BatchUIData],
- waitingBatches: Seq[BatchUIData],
- batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
- private val firstFailureReason = getFirstFailureReason(runningBatches)
+ override def pageSizeFormField: String = s"$tableTag.pageSize"
- override protected def columns: Seq[Node] = super.columns ++ {
- <th>Output Ops: Succeeded/Total</th>
- <th>Status</th> ++ {
- if (firstFailureReason.nonEmpty) {
- <th>Error</th>
- } else {
- Nil
- }
- }
- }
+ override def pageNumberFormField: String = s"$tableTag.page"
- override protected def renderRows: Seq[Node] = {
- // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
- // waiting batches before running batches
- waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
- runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$tableTag.sort=$encodedSortColumn" +
+ s"&$tableTag.desc=$desc" +
+ s"&$pageNumberFormField=$page" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableTag"
}
- private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
- if (firstFailureReason.nonEmpty) {
- getFirstFailureTableCell(batch)
- } else {
- Nil
+ override def goButtonFormPath: String =
+ s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
+
+ override def dataSource: PagedDataSource[BatchUIData] =
+ new StreamingDataSource(batches, pageSize, sortColumn, desc)
+
+ override def headers: Seq[Node] = {
+ // headers, sortable and tooltips
+ val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
+ Seq(
+ ("Batch Time", true, None),
+ ("Records", true, None),
+ ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
+ "of a batch")),
+ ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
+ if (tableTag.equals("completedBatches")) {
+ Seq(
+ ("Total Delay", true, Some("Total time taken to handle a batch")),
+ ("Output Ops: Succeeded/Total", false, None))
+ } else {
+ Seq(
+ ("Output Ops: Succeeded/Total", false, None),
+ ("Status", false, None))
+ }
+ } ++ {
+ if (firstFailureReason.nonEmpty) {
+ Seq(("Error", false, None))
+ } else {
+ Nil
+ }
}
}
+ // check if sort column is a valid sortable column
+ isSortColumnValid(headersAndCssClasses, sortColumn)
+
+ headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
}
- private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
- baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
- if (firstFailureReason.nonEmpty) {
- // Waiting batches have not run yet, so must have no failure reasons.
- <td>-</td>
- } else {
- Nil
+ override def row(batch: BatchUIData): Seq[Node] = {
+ val batchTime = batch.batchTime.milliseconds
+ val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
+ val numRecords = batch.numRecords
+ val schedulingDelay = batch.schedulingDelay
+ val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val processingTime = batch.processingDelay
+ val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val batchTimeId = s"batch-$batchTime"
+ val totalDelay = batch.totalDelay
+ val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+
+ <tr>
+ <td id={batchTimeId} isFailed={batch.isFailed.toString}>
+ <a href={s"batch?id=$batchTime"}>
+ {formattedBatchTime}
+ </a>
+ </td>
+ <td> {numRecords.toString} records </td>
+ <td> {formattedSchedulingDelay} </td>
+ <td> {formattedProcessingTime} </td>
+ {
+ if (tableTag.equals("completedBatches")) {
+ <td> {formattedTotalDelay} </td> ++
+ createOutputOperationProgressBar(batch) ++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
+ }
+ }
+ } else if (tableTag.equals("runningBatches")) {
+ createOutputOperationProgressBar(batch) ++
+ <td> processing </td> ++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
+ }
+ }
+ } else {
+ createOutputOperationProgressBar(batch) ++
+ <td> queued </td> ++ {
+ if (firstFailureReason.nonEmpty) {
+ // Waiting batches have not run yet, so must have no failure reasons.
+ <td>-</td>
+ } else {
+ Nil
+ }
+ }
+ }
}
- }
+ </tr>
}
}
-private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
- extends BatchTableBase("completed-batches-table", batchInterval) {
+private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
+ desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
- private val firstFailureReason = getFirstFailureReason(batches)
+ private val data = info.sorted(ordering(sortColumn, desc))
- override protected def columns: Seq[Node] = super.columns ++ {
- <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
- <th>Output Ops: Succeeded/Total</th> ++ {
- if (firstFailureReason.nonEmpty) {
- <th>Error</th>
- } else {
- Nil
- }
- }
- }
+ override protected def dataSize: Int = data.size
- override protected def renderRows: Seq[Node] = {
- batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
- }
+ override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
- private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
- val totalDelay = batch.totalDelay
- val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-
- baseRow(batch) ++ {
- <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
- {formattedTotalDelay}
- </td>
- } ++ createOutputOperationProgressBar(batch)++ {
- if (firstFailureReason.nonEmpty) {
- getFirstFailureTableCell(batch)
- } else {
- Nil
- }
+ private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
+ val ordering: Ordering[BatchUIData] = column match {
+ case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
+ case "Records" => Ordering.by(_.numRecords)
+ case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
+ case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
+ case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 3bdf009..42d0e50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable
import scala.xml.{Node, Unparsed}
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
+import org.apache.spark.util.Utils
/**
* A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
onClickTimelineFunc ++ basicInfo ++
listener.synchronized {
generateStatTable() ++
- generateBatchListTables()
+ generateBatchListTables(request)
}
SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
}
@@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab)
</tr>
}
- private def generateBatchListTables(): Seq[Node] = {
+ private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
+ tableTag: String): Seq[Node] = {
+ val interval: Long = listener.batchDuration
+ val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
+
+ try {
+ new StreamingPagedTable(
+ request,
+ tableTag,
+ batches,
+ SparkUIUtils.prependBaseUri(request, parent.basePath),
+ "streaming",
+ interval
+ ).table(streamingPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering streaming table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+ }
+
+ private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
val completedBatches = listener.retainedCompletedBatches.
sortBy(_.batchTime.milliseconds).reverse
- val activeBatchesContent = {
- <div class="row">
- <div class="col-12">
- <span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table"
- onClick="collapseTable('collapse-aggregated-activeBatches',
- 'aggregated-activeBatches')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Active Batches ({runningBatches.size + waitingBatches.size})</a>
- </h4>
- </span>
- <div class="aggregated-activeBatches collapsible-table">
- {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
+ val content = mutable.ListBuffer[Node]()
+
+ if (runningBatches.nonEmpty) {
+ content ++=
+ <div class="row">
+ <div class="col-12">
+ <span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningBatches',
+ 'aggregated-runningBatches')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Running Batches ({runningBatches.size})</a>
+ </h4>
+ </span>
+ <div class="aggregated-runningBatches collapsible-table">
+ { streamingTable(request, runningBatches, "runningBatches") }
+ </div>
</div>
</div>
- </div>
}
- val completedBatchesContent = {
- <div class="row">
- <div class="col-12">
- <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
- onClick="collapseTable('collapse-aggregated-completedBatches',
- 'aggregated-completedBatches')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Completed Batches (last {completedBatches.size}
- out of {listener.numTotalCompletedBatches})</a>
- </h4>
- </span>
- <div class="aggregated-completedBatches collapsible-table">
- {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
+ if (waitingBatches.nonEmpty) {
+ content ++=
+ <div class="row">
+ <div class="col-12">
+ <span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table"
+ onClick="collapseTable('collapse-aggregated-waitingBatches',
+ 'aggregated-waitingBatches')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Waiting Batches ({waitingBatches.size})</a>
+ </h4>
+ </span>
+ <div class="aggregated-waitingBatches collapsible-table">
+ { streamingTable(request, waitingBatches, "waitingBatches") }
+ </div>
</div>
</div>
- </div>
}
- activeBatchesContent ++ completedBatchesContent
+ if (completedBatches.nonEmpty) {
+ content ++=
+ <div class="row">
+ <div class="col-12">
+ <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
+ onClick="collapseTable('collapse-aggregated-completedBatches',
+ 'aggregated-completedBatches')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Completed Batches (last {completedBatches.size}
+ out of {listener.numTotalCompletedBatches})</a>
+ </h4>
+ </span>
+ <div class="aggregated-completedBatches collapsible-table">
+ { streamingTable(request, completedBatches, "completedBatches") }
+ </div>
+ </div>
+ </div>
+ }
+ content
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index bdc9e9e..7041e46 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -63,7 +63,7 @@ class UISeleniumSuite
.setMaster("local")
.setAppName("test")
.set(UI_ENABLED, true)
- val ssc = new StreamingContext(conf, Seconds(1))
+ val ssc = new StreamingContext(conf, Milliseconds(100))
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
ssc
}
@@ -104,7 +104,7 @@ class UISeleniumSuite
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
}
- eventually(timeout(10.seconds), interval(50.milliseconds)) {
+ eventually(timeout(10.seconds), interval(500.milliseconds)) {
// check whether streaming page exists
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -125,24 +125,37 @@ class UISeleniumSuite
// Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
- h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
- findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
- "Output Ops: Succeeded/Total", "Status")
- }
- findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
- "Total Delay (?)", "Output Ops: Succeeded/Total")
+ val arrow = 0x25BE.toChar
+ findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
+ List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
+ "Total Delay", "Output Ops: Succeeded/Total")
}
- val batchLinks =
- findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
+ val pageSize = 1
+ val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
+ "&completedBatches.desc=true&completedBatches.page=1" +
+ s"&completedBatches.pageSize=$pageSize#completedBatches"
+
+ go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
+ val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
+ .map(_.text).toList
+ // header row + pagesize
+ completedTableRows.length should be (1 + pageSize)
+
+ val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
+ s"&completedBatches.desc=false&completedBatches.pageSize=$pageSize#completedBatches"
+
+ // sort batches in ascending order of batch time
+ go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
+
+ val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
+ .flatMap(_.attribute("href")).toSeq
batchLinks.size should be >= 1
// Check a normal batch page
- go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
+ go to (batchLinks.head) // Head is the first batch, so it will have some jobs
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
summaryText should contain ("Batch Duration:")
summaryText should contain ("Input data size:")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org