You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/06/12 15:28:51 UTC

[spark] branch master updated: [SPARK-30119][WEBUI] Support pagination for streaming tab

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b098f1  [SPARK-30119][WEBUI] Support pagination for streaming tab
9b098f1 is described below

commit 9b098f1eb91a5e9f488d573bfeea3f6bfd9b95b3
Author: iRakson <ra...@gmail.com>
AuthorDate: Fri Jun 12 10:27:31 2020 -0500

    [SPARK-30119][WEBUI] Support pagination for streaming tab
    
    ### What changes were proposed in this pull request?
    #28747 reverted #28439 due to some flaky test case. This PR fixes the flaky test and adds pagination support.
    
    ### Why are the changes needed?
    To support pagination for streaming tab
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, Now streaming tab tables will be paginated.
    
    ### How was this patch tested?
    Manually.
    
    Closes #28748 from iRakson/fixstreamingpagination.
    
    Authored-by: iRakson <ra...@gmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../resources/org/apache/spark/ui/static/webui.js  |   3 +-
 .../spark/streaming/ui/AllBatchesTable.scala       | 282 +++++++++++----------
 .../apache/spark/streaming/ui/StreamingPage.scala  | 113 ++++++---
 .../apache/spark/streaming/UISeleniumSuite.scala   |  39 ++-
 4 files changed, 259 insertions(+), 178 deletions(-)

diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index 4f8409c..bb37256 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -87,7 +87,8 @@ $(function() {
   collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
   collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
   collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
-  collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
+  collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
+  collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
   collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
   collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
   collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 1e443f6..c0eec0e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,30 +17,41 @@
 
 package org.apache.spark.streaming.ui
 
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+import javax.servlet.http.HttpServletRequest
 
-private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
+import scala.xml.Node
 
-  protected def columns: Seq[Node] = {
-    <th>Batch Time</th>
-      <th>Records</th>
-      <th>Scheduling Delay
-        {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
-      </th>
-      <th>Processing Time
-        {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
-  }
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
+
+private[ui] class StreamingPagedTable(
+    request: HttpServletRequest,
+    tableTag: String,
+    batches: Seq[BatchUIData],
+    basePath: String,
+    subPath: String,
+    batchInterval: Long) extends PagedTable[BatchUIData] {
+
+  private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
+  private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
+  private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+  private val firstFailureReason: Option[String] =
+    if (!tableTag.equals("waitingBatches")) {
+      getFirstFailureReason(batches)
+    } else {
+      None
+    }
 
   /**
    * Return the first failure reason if finding in the batches.
    */
-  protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
+  private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
     batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
   }
 
-  protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+  private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
     val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
     firstFailureReason.map { failureReason =>
       val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
@@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
     }.getOrElse(<td>-</td>)
   }
 
-  protected def baseRow(batch: BatchUIData): Seq[Node] = {
-    val batchTime = batch.batchTime.milliseconds
-    val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
-    val numRecords = batch.numRecords
-    val schedulingDelay = batch.schedulingDelay
-    val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-    val processingTime = batch.processingDelay
-    val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
-    val batchTimeId = s"batch-$batchTime"
-
-    <td id={batchTimeId} sorttable_customkey={batchTime.toString}
-        isFailed={batch.isFailed.toString}>
-      <a href={s"batch?id=$batchTime"}>
-        {formattedBatchTime}
-      </a>
-    </td>
-      <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
-      <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
-        {formattedSchedulingDelay}
-      </td>
-      <td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
-        {formattedProcessingTime}
-      </td>
-  }
-
-  private def batchTable: Seq[Node] = {
-    <table id={tableId} class="table table-bordered table-striped table-sm sortable">
-      <thead>
-        {columns}
-      </thead>
-      <tbody>
-        {renderRows}
-      </tbody>
-    </table>
-  }
-
-  def toNodeSeq: Seq[Node] = {
-    batchTable
-  }
-
-  protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+  private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
     <td class="progress-cell">
       {
-      SparkUIUtils.makeProgressBar(
-        started = batch.numActiveOutputOp,
-        completed = batch.numCompletedOutputOp,
-        failed = batch.numFailedOutputOp,
-        skipped = 0,
-        reasonToNumKilled = Map.empty,
-        total = batch.outputOperations.size)
+        SparkUIUtils.makeProgressBar(
+          started = batch.numActiveOutputOp,
+          completed = batch.numCompletedOutputOp,
+          failed = batch.numFailedOutputOp,
+          skipped = 0,
+          reasonToNumKilled = Map.empty,
+          total = batch.outputOperations.size)
       }
     </td>
   }
 
-  /**
-   * Return HTML for all rows of this table.
-   */
-  protected def renderRows: Seq[Node]
-}
+  override def tableId: String = s"$tableTag-table"
 
-private[ui] class ActiveBatchTable(
-    runningBatches: Seq[BatchUIData],
-    waitingBatches: Seq[BatchUIData],
-    batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
+  override def tableCssClass: String =
+    "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
 
-  private val firstFailureReason = getFirstFailureReason(runningBatches)
+  override def pageSizeFormField: String = s"$tableTag.pageSize"
 
-  override protected def columns: Seq[Node] = super.columns ++ {
-    <th>Output Ops: Succeeded/Total</th>
-      <th>Status</th> ++ {
-      if (firstFailureReason.nonEmpty) {
-        <th>Error</th>
-      } else {
-        Nil
-      }
-    }
-  }
+  override def pageNumberFormField: String = s"$tableTag.page"
 
-  override protected def renderRows: Seq[Node] = {
-    // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
-    // waiting batches before running batches
-    waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
-      runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
+  override def pageLink(page: Int): String = {
+    parameterPath +
+    s"&$tableTag.sort=$encodedSortColumn" +
+    s"&$tableTag.desc=$desc" +
+    s"&$pageNumberFormField=$page" +
+    s"&$pageSizeFormField=$pageSize" +
+    s"#$tableTag"
   }
 
-  private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
-      if (firstFailureReason.nonEmpty) {
-        getFirstFailureTableCell(batch)
-      } else {
-        Nil
+  override def goButtonFormPath: String =
+    s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
+
+  override def dataSource: PagedDataSource[BatchUIData] =
+    new StreamingDataSource(batches, pageSize, sortColumn, desc)
+
+  override def headers: Seq[Node] = {
+    // headers, sortable and tooltips
+    val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
+      Seq(
+        ("Batch Time", true, None),
+        ("Records", true, None),
+        ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
+          "of a batch")),
+        ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
+        if (tableTag.equals("completedBatches")) {
+          Seq(
+            ("Total Delay", true, Some("Total time taken to handle a batch")),
+            ("Output Ops: Succeeded/Total", false, None))
+        } else {
+          Seq(
+            ("Output Ops: Succeeded/Total", false, None),
+            ("Status", false, None))
+        }
+      } ++ {
+        if (firstFailureReason.nonEmpty) {
+          Seq(("Error", false, None))
+        } else {
+          Nil
+        }
       }
     }
+    // check if sort column is a valid sortable column
+    isSortColumnValid(headersAndCssClasses, sortColumn)
+
+    headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
   }
 
-  private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
-      if (firstFailureReason.nonEmpty) {
-        // Waiting batches have not run yet, so must have no failure reasons.
-        <td>-</td>
-      } else {
-        Nil
+  override def row(batch: BatchUIData): Seq[Node] = {
+    val batchTime = batch.batchTime.milliseconds
+    val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
+    val numRecords = batch.numRecords
+    val schedulingDelay = batch.schedulingDelay
+    val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val processingTime = batch.processingDelay
+    val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val batchTimeId = s"batch-$batchTime"
+    val totalDelay = batch.totalDelay
+    val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+
+    <tr>
+      <td id={batchTimeId} isFailed={batch.isFailed.toString}>
+        <a href={s"batch?id=$batchTime"}>
+          {formattedBatchTime}
+        </a>
+      </td>
+      <td> {numRecords.toString} records </td>
+      <td> {formattedSchedulingDelay} </td>
+      <td> {formattedProcessingTime} </td>
+      {
+        if (tableTag.equals("completedBatches")) {
+          <td> {formattedTotalDelay} </td> ++
+          createOutputOperationProgressBar(batch) ++ {
+            if (firstFailureReason.nonEmpty) {
+              getFirstFailureTableCell(batch)
+            } else {
+              Nil
+            }
+          }
+        } else if (tableTag.equals("runningBatches")) {
+          createOutputOperationProgressBar(batch) ++
+          <td> processing </td>  ++ {
+            if (firstFailureReason.nonEmpty) {
+              getFirstFailureTableCell(batch)
+            } else {
+              Nil
+            }
+          }
+        } else {
+          createOutputOperationProgressBar(batch) ++
+          <td> queued </td> ++ {
+            if (firstFailureReason.nonEmpty) {
+              // Waiting batches have not run yet, so must have no failure reasons.
+              <td>-</td>
+            } else {
+              Nil
+            }
+          }
+        }
       }
-    }
+    </tr>
   }
 }
 
-private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
-  extends BatchTableBase("completed-batches-table", batchInterval) {
+private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
+    desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
 
-  private val firstFailureReason = getFirstFailureReason(batches)
+  private val data = info.sorted(ordering(sortColumn, desc))
 
-  override protected def columns: Seq[Node] = super.columns ++ {
-    <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
-      <th>Output Ops: Succeeded/Total</th> ++ {
-      if (firstFailureReason.nonEmpty) {
-        <th>Error</th>
-      } else {
-        Nil
-      }
-    }
-  }
+  override protected def dataSize: Int = data.size
 
-  override protected def renderRows: Seq[Node] = {
-    batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
-  }
+  override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
 
-  private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
-    val totalDelay = batch.totalDelay
-    val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-
-    baseRow(batch) ++ {
-      <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
-        {formattedTotalDelay}
-      </td>
-    } ++ createOutputOperationProgressBar(batch)++ {
-      if (firstFailureReason.nonEmpty) {
-        getFirstFailureTableCell(batch)
-      } else {
-        Nil
-      }
+  private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
+    val ordering: Ordering[BatchUIData] = column match {
+      case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
+      case "Records" => Ordering.by(_.numRecords)
+      case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
+      case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
+      case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
     }
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 3bdf009..42d0e50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui
 import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.mutable
 import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
+import org.apache.spark.util.Utils
 
 /**
  * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
       onClickTimelineFunc ++ basicInfo ++
       listener.synchronized {
         generateStatTable() ++
-          generateBatchListTables()
+          generateBatchListTables(request)
       }
     SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
   }
@@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab)
     </tr>
   }
 
-  private def generateBatchListTables(): Seq[Node] = {
+  private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
+      tableTag: String): Seq[Node] = {
+    val interval: Long = listener.batchDuration
+    val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
+
+    try {
+      new StreamingPagedTable(
+        request,
+        tableTag,
+        batches,
+        SparkUIUtils.prependBaseUri(request, parent.basePath),
+        "streaming",
+        interval
+      ).table(streamingPage)
+    } catch {
+      case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+        <div class="alert alert-error">
+          <p>Error while rendering streaming table:</p>
+          <pre>
+            {Utils.exceptionString(e)}
+          </pre>
+        </div>
+    }
+  }
+
+  private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
     val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
     val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
     val completedBatches = listener.retainedCompletedBatches.
       sortBy(_.batchTime.milliseconds).reverse
 
-    val activeBatchesContent = {
-      <div class="row">
-        <div class="col-12">
-          <span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table"
-                onClick="collapseTable('collapse-aggregated-activeBatches',
-                'aggregated-activeBatches')">
-            <h4>
-              <span class="collapse-table-arrow arrow-open"></span>
-              <a>Active Batches ({runningBatches.size + waitingBatches.size})</a>
-            </h4>
-          </span>
-          <div class="aggregated-activeBatches collapsible-table">
-            {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
+    val content = mutable.ListBuffer[Node]()
+
+    if (runningBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-runningBatches',
+                  'aggregated-runningBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Running Batches ({runningBatches.size})</a>
+              </h4>
+            </span>
+            <div class="aggregated-runningBatches collapsible-table">
+              { streamingTable(request, runningBatches, "runningBatches") }
+            </div>
           </div>
         </div>
-      </div>
     }
 
-    val completedBatchesContent = {
-      <div class="row">
-        <div class="col-12">
-          <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
-                onClick="collapseTable('collapse-aggregated-completedBatches',
-                'aggregated-completedBatches')">
-            <h4>
-              <span class="collapse-table-arrow arrow-open"></span>
-              <a>Completed Batches (last {completedBatches.size}
-                out of {listener.numTotalCompletedBatches})</a>
-            </h4>
-          </span>
-          <div class="aggregated-completedBatches collapsible-table">
-            {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
+    if (waitingBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-waitingBatches',
+                  'aggregated-waitingBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Waiting Batches ({waitingBatches.size})</a>
+              </h4>
+            </span>
+            <div class="aggregated-waitingBatches collapsible-table">
+              { streamingTable(request, waitingBatches, "waitingBatches") }
+            </div>
           </div>
         </div>
-      </div>
     }
 
-    activeBatchesContent ++ completedBatchesContent
+    if (completedBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-completedBatches',
+                  'aggregated-completedBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Completed Batches (last {completedBatches.size}
+                  out of {listener.numTotalCompletedBatches})</a>
+              </h4>
+            </span>
+            <div class="aggregated-completedBatches collapsible-table">
+              { streamingTable(request, completedBatches, "completedBatches") }
+            </div>
+          </div>
+        </div>
+    }
+    content
   }
 }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index bdc9e9e..7041e46 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -63,7 +63,7 @@ class UISeleniumSuite
       .setMaster("local")
       .setAppName("test")
       .set(UI_ENABLED, true)
-    val ssc = new StreamingContext(conf, Seconds(1))
+    val ssc = new StreamingContext(conf, Milliseconds(100))
     assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
     ssc
   }
@@ -104,7 +104,7 @@ class UISeleniumSuite
         find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
       }
 
-      eventually(timeout(10.seconds), interval(50.milliseconds)) {
+      eventually(timeout(10.seconds), interval(500.milliseconds)) {
         // check whether streaming page exists
         go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
         val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -125,24 +125,37 @@ class UISeleniumSuite
 
         // Check batch tables
         val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
-        h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
         h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
 
-        findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
-            "Output Ops: Succeeded/Total", "Status")
-        }
-        findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
-            "Total Delay (?)", "Output Ops: Succeeded/Total")
+        val arrow = 0x25BE.toChar
+        findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
+          List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
+            "Total Delay", "Output Ops: Succeeded/Total")
         }
 
-        val batchLinks =
-          findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
+        val pageSize = 1
+        val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
+          "&completedBatches.desc=true&completedBatches.page=1" +
+          s"&completedBatches.pageSize=$pageSize#completedBatches"
+
+        go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
+        val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
+          .map(_.text).toList
+        // header row + pagesize
+        completedTableRows.length should be (1 + pageSize)
+
+        val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
+          s"&completedBatches.desc=false&completedBatches.pageSize=$pageSize#completedBatches"
+
+        // sort batches in ascending order of batch time
+        go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
+
+        val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
+          .flatMap(_.attribute("href")).toSeq
         batchLinks.size should be >= 1
 
         // Check a normal batch page
-        go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
+        go to (batchLinks.head) // Head is the first batch, so it will have some jobs
         val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
         summaryText should contain ("Batch Duration:")
         summaryText should contain ("Input data size:")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org