You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2020/06/07 04:11:29 UTC

[spark] branch master updated: [SPARK-30119][WEBUI] Add Pagination Support to Streaming Page

This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e9337f5  [SPARK-30119][WEBUI] Add Pagination Support to Streaming Page
e9337f5 is described below

commit e9337f505b737f4501d4173baa9c5739626b06a8
Author: iRakson <ra...@gmail.com>
AuthorDate: Sun Jun 7 13:08:50 2020 +0900

    [SPARK-30119][WEBUI] Add Pagination Support to Streaming Page
    
    ### What changes were proposed in this pull request?
    * Pagination Support is added to all tables of streaming page in spark web UI.
    For adding pagination support, existing classes from #7399 were used.
    * Earlier streaming page has two tables `Active Batches` and `Completed Batches`. Now, we will have three tables `Running Batches`, `Waiting Batches` and `Completed Batches`. If we have large number of waiting and running batches then keeping track in a single table is difficult. Also other pages have different table for different type type of data.
    * Earlier empty tables were shown. Now only non-empty tables will be shown.
    `Active Batches` table used to show details of waiting batches followed by running batches.
    
    ### Why are the changes needed?
    Pagination will allow users to analyse the table in much better way. All spark web UI pages support pagination apart from streaming pages, so this will add consistency as well. Also it might fix the potential OOM errors that can arise.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. `Active Batches` table is split into two tables `Running Batches` and `Waiting Batches`. Pagination Support is added to the all the tables. Every other functionality is unchanged.
    
    ### How was this patch tested?
    Manually.
    
    Before changes:
    <img width="1667" alt="Screenshot 2020-05-03 at 7 07 14 PM" src="https://user-images.githubusercontent.com/15366835/80915680-8fb44b80-8d71-11ea-9957-c4a3769b8b67.png">
    
    After Changes:
    <img width="1669" alt="Screenshot 2020-05-03 at 6 51 22 PM" src="https://user-images.githubusercontent.com/15366835/80915694-a9ee2980-8d71-11ea-8fc5-246413a4951d.png">
    
    Closes #28439 from iRakson/streamingPagination.
    
    Authored-by: iRakson <ra...@gmail.com>
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
 .../resources/org/apache/spark/ui/static/webui.js  |   3 +-
 .../spark/streaming/ui/AllBatchesTable.scala       | 282 +++++++++++----------
 .../apache/spark/streaming/ui/StreamingPage.scala  | 113 ++++++---
 .../apache/spark/streaming/UISeleniumSuite.scala   |  45 +++-
 4 files changed, 267 insertions(+), 176 deletions(-)

diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index 4f8409c..bb37256 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -87,7 +87,8 @@ $(function() {
   collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
   collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
   collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
-  collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
+  collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
+  collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
   collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
   collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
   collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 1e443f6..c0eec0e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,30 +17,41 @@
 
 package org.apache.spark.streaming.ui
 
-import scala.xml.Node
-
-import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
+import javax.servlet.http.HttpServletRequest
 
-private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
+import scala.xml.Node
 
-  protected def columns: Seq[Node] = {
-    <th>Batch Time</th>
-      <th>Records</th>
-      <th>Scheduling Delay
-        {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
-      </th>
-      <th>Processing Time
-        {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
-  }
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
+
+private[ui] class StreamingPagedTable(
+    request: HttpServletRequest,
+    tableTag: String,
+    batches: Seq[BatchUIData],
+    basePath: String,
+    subPath: String,
+    batchInterval: Long) extends PagedTable[BatchUIData] {
+
+  private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
+  private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
+  private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+  private val firstFailureReason: Option[String] =
+    if (!tableTag.equals("waitingBatches")) {
+      getFirstFailureReason(batches)
+    } else {
+      None
+    }
 
   /**
    * Return the first failure reason if finding in the batches.
    */
-  protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
+  private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
     batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
   }
 
-  protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+  private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
     val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
     firstFailureReason.map { failureReason =>
       val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
@@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
     }.getOrElse(<td>-</td>)
   }
 
-  protected def baseRow(batch: BatchUIData): Seq[Node] = {
-    val batchTime = batch.batchTime.milliseconds
-    val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
-    val numRecords = batch.numRecords
-    val schedulingDelay = batch.schedulingDelay
-    val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-    val processingTime = batch.processingDelay
-    val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
-    val batchTimeId = s"batch-$batchTime"
-
-    <td id={batchTimeId} sorttable_customkey={batchTime.toString}
-        isFailed={batch.isFailed.toString}>
-      <a href={s"batch?id=$batchTime"}>
-        {formattedBatchTime}
-      </a>
-    </td>
-      <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
-      <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
-        {formattedSchedulingDelay}
-      </td>
-      <td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
-        {formattedProcessingTime}
-      </td>
-  }
-
-  private def batchTable: Seq[Node] = {
-    <table id={tableId} class="table table-bordered table-striped table-sm sortable">
-      <thead>
-        {columns}
-      </thead>
-      <tbody>
-        {renderRows}
-      </tbody>
-    </table>
-  }
-
-  def toNodeSeq: Seq[Node] = {
-    batchTable
-  }
-
-  protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+  private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
     <td class="progress-cell">
       {
-      SparkUIUtils.makeProgressBar(
-        started = batch.numActiveOutputOp,
-        completed = batch.numCompletedOutputOp,
-        failed = batch.numFailedOutputOp,
-        skipped = 0,
-        reasonToNumKilled = Map.empty,
-        total = batch.outputOperations.size)
+        SparkUIUtils.makeProgressBar(
+          started = batch.numActiveOutputOp,
+          completed = batch.numCompletedOutputOp,
+          failed = batch.numFailedOutputOp,
+          skipped = 0,
+          reasonToNumKilled = Map.empty,
+          total = batch.outputOperations.size)
       }
     </td>
   }
 
-  /**
-   * Return HTML for all rows of this table.
-   */
-  protected def renderRows: Seq[Node]
-}
+  override def tableId: String = s"$tableTag-table"
 
-private[ui] class ActiveBatchTable(
-    runningBatches: Seq[BatchUIData],
-    waitingBatches: Seq[BatchUIData],
-    batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
+  override def tableCssClass: String =
+    "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
 
-  private val firstFailureReason = getFirstFailureReason(runningBatches)
+  override def pageSizeFormField: String = s"$tableTag.pageSize"
 
-  override protected def columns: Seq[Node] = super.columns ++ {
-    <th>Output Ops: Succeeded/Total</th>
-      <th>Status</th> ++ {
-      if (firstFailureReason.nonEmpty) {
-        <th>Error</th>
-      } else {
-        Nil
-      }
-    }
-  }
+  override def pageNumberFormField: String = s"$tableTag.page"
 
-  override protected def renderRows: Seq[Node] = {
-    // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
-    // waiting batches before running batches
-    waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
-      runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
+  override def pageLink(page: Int): String = {
+    parameterPath +
+    s"&$tableTag.sort=$encodedSortColumn" +
+    s"&$tableTag.desc=$desc" +
+    s"&$pageNumberFormField=$page" +
+    s"&$pageSizeFormField=$pageSize" +
+    s"#$tableTag"
   }
 
-  private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
-      if (firstFailureReason.nonEmpty) {
-        getFirstFailureTableCell(batch)
-      } else {
-        Nil
+  override def goButtonFormPath: String =
+    s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
+
+  override def dataSource: PagedDataSource[BatchUIData] =
+    new StreamingDataSource(batches, pageSize, sortColumn, desc)
+
+  override def headers: Seq[Node] = {
+    // headers, sortable and tooltips
+    val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
+      Seq(
+        ("Batch Time", true, None),
+        ("Records", true, None),
+        ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
+          "of a batch")),
+        ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
+        if (tableTag.equals("completedBatches")) {
+          Seq(
+            ("Total Delay", true, Some("Total time taken to handle a batch")),
+            ("Output Ops: Succeeded/Total", false, None))
+        } else {
+          Seq(
+            ("Output Ops: Succeeded/Total", false, None),
+            ("Status", false, None))
+        }
+      } ++ {
+        if (firstFailureReason.nonEmpty) {
+          Seq(("Error", false, None))
+        } else {
+          Nil
+        }
       }
     }
+    // check if sort column is a valid sortable column
+    isSortColumnValid(headersAndCssClasses, sortColumn)
+
+    headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
   }
 
-  private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-    baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
-      if (firstFailureReason.nonEmpty) {
-        // Waiting batches have not run yet, so must have no failure reasons.
-        <td>-</td>
-      } else {
-        Nil
+  override def row(batch: BatchUIData): Seq[Node] = {
+    val batchTime = batch.batchTime.milliseconds
+    val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
+    val numRecords = batch.numRecords
+    val schedulingDelay = batch.schedulingDelay
+    val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val processingTime = batch.processingDelay
+    val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val batchTimeId = s"batch-$batchTime"
+    val totalDelay = batch.totalDelay
+    val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+
+    <tr>
+      <td id={batchTimeId} isFailed={batch.isFailed.toString}>
+        <a href={s"batch?id=$batchTime"}>
+          {formattedBatchTime}
+        </a>
+      </td>
+      <td> {numRecords.toString} records </td>
+      <td> {formattedSchedulingDelay} </td>
+      <td> {formattedProcessingTime} </td>
+      {
+        if (tableTag.equals("completedBatches")) {
+          <td> {formattedTotalDelay} </td> ++
+          createOutputOperationProgressBar(batch) ++ {
+            if (firstFailureReason.nonEmpty) {
+              getFirstFailureTableCell(batch)
+            } else {
+              Nil
+            }
+          }
+        } else if (tableTag.equals("runningBatches")) {
+          createOutputOperationProgressBar(batch) ++
+          <td> processing </td>  ++ {
+            if (firstFailureReason.nonEmpty) {
+              getFirstFailureTableCell(batch)
+            } else {
+              Nil
+            }
+          }
+        } else {
+          createOutputOperationProgressBar(batch) ++
+          <td> queued </td> ++ {
+            if (firstFailureReason.nonEmpty) {
+              // Waiting batches have not run yet, so must have no failure reasons.
+              <td>-</td>
+            } else {
+              Nil
+            }
+          }
+        }
       }
-    }
+    </tr>
   }
 }
 
-private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
-  extends BatchTableBase("completed-batches-table", batchInterval) {
+private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
+    desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
 
-  private val firstFailureReason = getFirstFailureReason(batches)
+  private val data = info.sorted(ordering(sortColumn, desc))
 
-  override protected def columns: Seq[Node] = super.columns ++ {
-    <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
-      <th>Output Ops: Succeeded/Total</th> ++ {
-      if (firstFailureReason.nonEmpty) {
-        <th>Error</th>
-      } else {
-        Nil
-      }
-    }
-  }
+  override protected def dataSize: Int = data.size
 
-  override protected def renderRows: Seq[Node] = {
-    batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
-  }
+  override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
 
-  private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
-    val totalDelay = batch.totalDelay
-    val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-
-    baseRow(batch) ++ {
-      <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
-        {formattedTotalDelay}
-      </td>
-    } ++ createOutputOperationProgressBar(batch)++ {
-      if (firstFailureReason.nonEmpty) {
-        getFirstFailureTableCell(batch)
-      } else {
-        Nil
-      }
+  private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
+    val ordering: Ordering[BatchUIData] = column match {
+      case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
+      case "Records" => Ordering.by(_.numRecords)
+      case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
+      case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
+      case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
     }
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 3bdf009..42d0e50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui
 import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.mutable
 import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
+import org.apache.spark.util.Utils
 
 /**
  * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
       onClickTimelineFunc ++ basicInfo ++
       listener.synchronized {
         generateStatTable() ++
-          generateBatchListTables()
+          generateBatchListTables(request)
       }
     SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
   }
@@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab)
     </tr>
   }
 
-  private def generateBatchListTables(): Seq[Node] = {
+  private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
+      tableTag: String): Seq[Node] = {
+    val interval: Long = listener.batchDuration
+    val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
+
+    try {
+      new StreamingPagedTable(
+        request,
+        tableTag,
+        batches,
+        SparkUIUtils.prependBaseUri(request, parent.basePath),
+        "streaming",
+        interval
+      ).table(streamingPage)
+    } catch {
+      case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+        <div class="alert alert-error">
+          <p>Error while rendering streaming table:</p>
+          <pre>
+            {Utils.exceptionString(e)}
+          </pre>
+        </div>
+    }
+  }
+
+  private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
     val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
     val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
     val completedBatches = listener.retainedCompletedBatches.
       sortBy(_.batchTime.milliseconds).reverse
 
-    val activeBatchesContent = {
-      <div class="row">
-        <div class="col-12">
-          <span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table"
-                onClick="collapseTable('collapse-aggregated-activeBatches',
-                'aggregated-activeBatches')">
-            <h4>
-              <span class="collapse-table-arrow arrow-open"></span>
-              <a>Active Batches ({runningBatches.size + waitingBatches.size})</a>
-            </h4>
-          </span>
-          <div class="aggregated-activeBatches collapsible-table">
-            {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
+    val content = mutable.ListBuffer[Node]()
+
+    if (runningBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-runningBatches',
+                  'aggregated-runningBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Running Batches ({runningBatches.size})</a>
+              </h4>
+            </span>
+            <div class="aggregated-runningBatches collapsible-table">
+              { streamingTable(request, runningBatches, "runningBatches") }
+            </div>
           </div>
         </div>
-      </div>
     }
 
-    val completedBatchesContent = {
-      <div class="row">
-        <div class="col-12">
-          <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
-                onClick="collapseTable('collapse-aggregated-completedBatches',
-                'aggregated-completedBatches')">
-            <h4>
-              <span class="collapse-table-arrow arrow-open"></span>
-              <a>Completed Batches (last {completedBatches.size}
-                out of {listener.numTotalCompletedBatches})</a>
-            </h4>
-          </span>
-          <div class="aggregated-completedBatches collapsible-table">
-            {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
+    if (waitingBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-waitingBatches',
+                  'aggregated-waitingBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Waiting Batches ({waitingBatches.size})</a>
+              </h4>
+            </span>
+            <div class="aggregated-waitingBatches collapsible-table">
+              { streamingTable(request, waitingBatches, "waitingBatches") }
+            </div>
           </div>
         </div>
-      </div>
     }
 
-    activeBatchesContent ++ completedBatchesContent
+    if (completedBatches.nonEmpty) {
+      content ++=
+        <div class="row">
+          <div class="col-12">
+            <span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
+                  onClick="collapseTable('collapse-aggregated-completedBatches',
+                  'aggregated-completedBatches')">
+              <h4>
+                <span class="collapse-table-arrow arrow-open"></span>
+                <a>Completed Batches (last {completedBatches.size}
+                  out of {listener.numTotalCompletedBatches})</a>
+              </h4>
+            </span>
+            <div class="aggregated-completedBatches collapsible-table">
+              { streamingTable(request, completedBatches, "completedBatches") }
+            </div>
+          </div>
+        </div>
+    }
+    content
   }
 }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index bdc9e9e..952ef6c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -63,7 +63,7 @@ class UISeleniumSuite
       .setMaster("local")
       .setAppName("test")
       .set(UI_ENABLED, true)
-    val ssc = new StreamingContext(conf, Seconds(1))
+    val ssc = new StreamingContext(conf, Milliseconds(100))
     assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
     ssc
   }
@@ -104,7 +104,7 @@ class UISeleniumSuite
         find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
       }
 
-      eventually(timeout(10.seconds), interval(50.milliseconds)) {
+      eventually(timeout(10.seconds), interval(500.milliseconds)) {
         // check whether streaming page exists
         go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
         val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -125,24 +125,47 @@ class UISeleniumSuite
 
         // Check batch tables
         val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
-        h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
+        h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true)
+        h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true)
         h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
 
-        findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
+        val arrow = 0x25BE.toChar
+        findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be {
+          List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
+            "Output Ops: Succeeded/Total", "Status")
+        }
+        findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be {
+          List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
             "Output Ops: Succeeded/Total", "Status")
         }
-        findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
-            "Total Delay (?)", "Output Ops: Succeeded/Total")
+        findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
+          List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
+            "Total Delay", "Output Ops: Succeeded/Total")
         }
 
-        val batchLinks =
-          findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
+        val pageSize = 3
+        val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
+          "&completedBatches.desc=true&completedBatches.page=1" +
+          s"&completedBatches.pageSize=$pageSize#completedBatches"
+
+        go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
+        val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
+          .map(_.text).toList
+        // header row + pagesize
+        completedTableRows.length should be (1 + pageSize)
+
+        val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
+          "&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches"
+
+        // sort batches in ascending order of batch time
+        go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
+
+        val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
+          .flatMap(_.attribute("href")).toSeq
         batchLinks.size should be >= 1
 
         // Check a normal batch page
-        go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
+        go to (batchLinks.head) // Head is the first batch, so it will have some jobs
         val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
         summaryText should contain ("Batch Duration:")
         summaryText should contain ("Input data size:")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org