You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/21 18:54:42 UTC

spark git commit: [SPARK-4598] [WEBUI] Task table pagination for the Stage page

Repository: spark
Updated Branches:
  refs/heads/master 31954910d -> 4f7f1ee37


[SPARK-4598] [WEBUI] Task table pagination for the Stage page

This PR adds pagination for the task table to solve the scalability issue of the stage page. Here is the initial screenshot:
<img width="1347" alt="pagination" src="https://cloud.githubusercontent.com/assets/1000778/8679669/9e63863c-2a8e-11e5-94e4-994febcd6717.png">
The task table only shows 100 tasks. There is a page navigation above the table. Users can click the page navigation or type the page number to jump to another page. The table can be sorted by clicking the headers. However, unlike previous implementation, the sorting work is done in the server now. So clicking a table column to sort needs to refresh the web page.

Author: zsxwing <zs...@gmail.com>

Closes #7399 from zsxwing/task-table-pagination and squashes the following commits:

144f513 [zsxwing] Display the page navigation when the page number is out of range
a3eee22 [zsxwing] Add extra space for the error message
54c5b84 [zsxwing] Reset page to 1 if the user changes the page size
c2f7f39 [zsxwing] Add a text field to let users fill the page size
bad52eb [zsxwing] Display user-friendly error messages
410586b [zsxwing] Scroll down to the tasks table if the url contains any sort column
a0746d1 [zsxwing] Use expand-dag-viz-arrow-job and expand-dag-viz-arrow-stage instead of expand-dag-viz-arrow-true and expand-dag-viz-arrow-false
b123f67 [zsxwing] Use localStorage to remember the user's actions and replay them when loading the page
894a342 [zsxwing] Show the link cursor when hovering for headers and page links and other minor fix
4d4fecf [zsxwing] Address Carson's comments
d9285f0 [zsxwing] Add comments and fix the style
74285fa [zsxwing] Merge branch 'master' into task-table-pagination
db6c859 [zsxwing] Task table pagination for the Stage page


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7f1ee3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7f1ee3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7f1ee3

Branch: refs/heads/master
Commit: 4f7f1ee378e80b33686508d56e133fc25dec5316
Parents: 3195491
Author: zsxwing <zs...@gmail.com>
Authored: Tue Jul 21 09:54:39 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Jul 21 09:54:39 2015 -0700

----------------------------------------------------------------------
 .../spark/ui/static/additional-metrics.js       |  34 +-
 .../org/apache/spark/ui/static/spark-dag-viz.js |  27 +
 .../org/apache/spark/ui/static/timeline-view.js |  39 +
 .../scala/org/apache/spark/ui/PagedTable.scala  | 246 ++++++
 .../org/apache/spark/ui/jobs/StagePage.scala    | 879 ++++++++++++++-----
 .../org/apache/spark/ui/PagedTableSuite.scala   |  99 +++
 6 files changed, 1102 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
index 0b450dc..3c8dddd 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
@@ -19,6 +19,9 @@
  * to be registered after the page loads. */
 $(function() {
     $("span.expand-additional-metrics").click(function(){
+        var status = window.localStorage.getItem("expand-additional-metrics") == "true";
+        status = !status;
+
         // Expand the list of additional metrics.
         var additionalMetricsDiv = $(this).parent().find('.additional-metrics');
         $(additionalMetricsDiv).toggleClass('collapsed');
@@ -26,17 +29,31 @@ $(function() {
         // Switch the class of the arrow from open to closed.
         $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
         $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');
+
+        window.localStorage.setItem("expand-additional-metrics", "" + status);
     });
 
+    if (window.localStorage.getItem("expand-additional-metrics") == "true") {
+        // Set it to false so that the click function can revert it
+        window.localStorage.setItem("expand-additional-metrics", "false");
+        $("span.expand-additional-metrics").trigger("click");
+    }
+
     stripeSummaryTable();
 
     $('input[type="checkbox"]').click(function() {
-        var column = "table ." + $(this).attr("name");
+        var name = $(this).attr("name")
+        var column = "table ." + name;
+        var status = window.localStorage.getItem(name) == "true";
+        status = !status;
         $(column).toggle();
         stripeSummaryTable();
+        window.localStorage.setItem(name, "" + status);
     });
 
     $("#select-all-metrics").click(function() {
+       var status = window.localStorage.getItem("select-all-metrics") == "true";
+       status = !status;
        if (this.checked) {
           // Toggle all un-checked options.
           $('input[type="checkbox"]:not(:checked)').trigger('click');
@@ -44,6 +61,21 @@ $(function() {
           // Toggle all checked options.
           $('input[type="checkbox"]:checked').trigger('click');
        }
+       window.localStorage.setItem("select-all-metrics", "" + status);
+    });
+
+    if (window.localStorage.getItem("select-all-metrics") == "true") {
+        $("#select-all-metrics").attr('checked', status);
+    }
+
+    $("span.additional-metric-title").parent().find('input[type="checkbox"]').each(function() {
+        var name = $(this).attr("name")
+        // If name is undefined, then skip it because it's the "select-all-metrics" checkbox
+        if (name && window.localStorage.getItem(name) == "true") {
+            // Set it to false so that the click function can revert it
+            window.localStorage.setItem(name, "false");
+            $(this).trigger("click")
+        }
     });
 
     // Trigger a click on the checkbox if a user clicks the label next to it.

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index 9fa53ba..4a893bc 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -73,12 +73,23 @@ var StagePageVizConstants = {
 };
 
 /*
+ * Return "expand-dag-viz-arrow-job" if forJob is true.
+ * Otherwise, return "expand-dag-viz-arrow-stage".
+ */
+function expandDagVizArrowKey(forJob) {
+  return forJob ? "expand-dag-viz-arrow-job" : "expand-dag-viz-arrow-stage";
+}
+
+/*
  * Show or hide the RDD DAG visualization.
  *
  * The graph is only rendered the first time this is called.
  * This is the narrow interface called from the Scala UI code.
  */
 function toggleDagViz(forJob) {
+  var status = window.localStorage.getItem(expandDagVizArrowKey(forJob)) == "true";
+  status = !status;
+
   var arrowSelector = ".expand-dag-viz-arrow";
   $(arrowSelector).toggleClass('arrow-closed');
   $(arrowSelector).toggleClass('arrow-open');
@@ -93,8 +104,24 @@ function toggleDagViz(forJob) {
     // Save the graph for later so we don't have to render it again
     graphContainer().style("display", "none");
   }
+
+  window.localStorage.setItem(expandDagVizArrowKey(forJob), "" + status);
 }
 
+$(function (){
+  if (window.localStorage.getItem(expandDagVizArrowKey(false)) == "true") {
+    // Set it to false so that the click function can revert it
+    window.localStorage.setItem(expandDagVizArrowKey(false), "false");
+    toggleDagViz(false);
+  }
+
+  if (window.localStorage.getItem(expandDagVizArrowKey(true)) == "true") {
+    // Set it to false so that the click function can revert it
+    window.localStorage.setItem(expandDagVizArrowKey(true), "false");
+    toggleDagViz(true);
+  }
+});
+
 /*
  * Render the RDD DAG visualization.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
index ca74ef9..f4453c7 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
@@ -66,14 +66,27 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
   setupJobEventAction();
 
   $("span.expand-application-timeline").click(function() {
+    var status = window.localStorage.getItem("expand-application-timeline") == "true";
+    status = !status;
+
     $("#application-timeline").toggleClass('collapsed');
 
     // Switch the class of the arrow from open to closed.
     $(this).find('.expand-application-timeline-arrow').toggleClass('arrow-open');
     $(this).find('.expand-application-timeline-arrow').toggleClass('arrow-closed');
+
+    window.localStorage.setItem("expand-application-timeline", "" + status);
   });
 }
 
+$(function (){
+  if (window.localStorage.getItem("expand-application-timeline") == "true") {
+    // Set it to false so that the click function can revert it
+    window.localStorage.setItem("expand-application-timeline", "false");
+    $("span.expand-application-timeline").trigger('click');
+  }
+});
+
 function drawJobTimeline(groupArray, eventObjArray, startTime) {
   var groups = new vis.DataSet(groupArray);
   var items = new vis.DataSet(eventObjArray);
@@ -125,14 +138,27 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
   setupStageEventAction();
 
   $("span.expand-job-timeline").click(function() {
+    var status = window.localStorage.getItem("expand-job-timeline") == "true";
+    status = !status;
+
     $("#job-timeline").toggleClass('collapsed');
 
     // Switch the class of the arrow from open to closed.
     $(this).find('.expand-job-timeline-arrow').toggleClass('arrow-open');
     $(this).find('.expand-job-timeline-arrow').toggleClass('arrow-closed');
+
+    window.localStorage.setItem("expand-job-timeline", "" + status);
   });
 }
 
+$(function (){
+  if (window.localStorage.getItem("expand-job-timeline") == "true") {
+    // Set it to false so that the click function can revert it
+    window.localStorage.setItem("expand-job-timeline", "false");
+    $("span.expand-job-timeline").trigger('click');
+  }
+});
+
 function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, maxFinishTime) {
   var groups = new vis.DataSet(groupArray);
   var items = new vis.DataSet(eventObjArray);
@@ -176,14 +202,27 @@ function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, ma
   setupZoomable("#task-assignment-timeline-zoom-lock", taskTimeline);
 
   $("span.expand-task-assignment-timeline").click(function() {
+    var status = window.localStorage.getItem("expand-task-assignment-timeline") == "true";
+    status = !status;
+
     $("#task-assignment-timeline").toggleClass("collapsed");
 
      // Switch the class of the arrow from open to closed.
     $(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-open");
     $(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-closed");
+
+    window.localStorage.setItem("expand-task-assignment-timeline", "" + status);
   });
 }
 
+$(function (){
+  if (window.localStorage.getItem("expand-task-assignment-timeline") == "true") {
+    // Set it to false so that the click function can revert it
+    window.localStorage.setItem("expand-task-assignment-timeline", "false");
+    $("span.expand-task-assignment-timeline").trigger('click');
+  }
+});
+
 function setupExecutorEventAction() {
   $(".item.box.executor").each(function () {
     $(this).hover(

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
new file mode 100644
index 0000000..17d7b39
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.xml.{Node, Unparsed}
+
+/**
+ * A data source that provides data for a page.
+ *
+ * @param pageSize the number of rows in a page
+ */
+private[ui] abstract class PagedDataSource[T](val pageSize: Int) {
+
+  if (pageSize <= 0) {
+    throw new IllegalArgumentException("Page size must be positive")
+  }
+
+  /**
+   * Return the size of all data.
+   */
+  protected def dataSize: Int
+
+  /**
+   * Slice a range of data.
+   */
+  protected def sliceData(from: Int, to: Int): Seq[T]
+
+  /**
+   * Slice the data for this page
+   */
+  def pageData(page: Int): PageData[T] = {
+    val totalPages = (dataSize + pageSize - 1) / pageSize
+    if (page <= 0 || page > totalPages) {
+      throw new IndexOutOfBoundsException(
+        s"Page $page is out of range. Please select a page number between 1 and $totalPages.")
+    }
+    val from = (page - 1) * pageSize
+    val to = dataSize.min(page * pageSize)
+    PageData(totalPages, sliceData(from, to))
+  }
+
+}
+
+/**
+ * The data returned by `PagedDataSource.pageData`, including the page number, the number of total
+ * pages and the data in this page.
+ */
+private[ui] case class PageData[T](totalPage: Int, data: Seq[T])
+
+/**
+ * A paged table that will generate a HTML table for a specified page and also the page navigation.
+ */
+private[ui] trait PagedTable[T] {
+
+  def tableId: String
+
+  def tableCssClass: String
+
+  def dataSource: PagedDataSource[T]
+
+  def headers: Seq[Node]
+
+  def row(t: T): Seq[Node]
+
+  def table(page: Int): Seq[Node] = {
+    val _dataSource = dataSource
+    try {
+      val PageData(totalPages, data) = _dataSource.pageData(page)
+      <div>
+        {pageNavigation(page, _dataSource.pageSize, totalPages)}
+        <table class={tableCssClass} id={tableId}>
+          {headers}
+          <tbody>
+            {data.map(row)}
+          </tbody>
+        </table>
+      </div>
+    } catch {
+      case e: IndexOutOfBoundsException =>
+        val PageData(totalPages, _) = _dataSource.pageData(1)
+        <div>
+          {pageNavigation(1, _dataSource.pageSize, totalPages)}
+          <div class="alert alert-error">{e.getMessage}</div>
+        </div>
+    }
+  }
+
+  /**
+   * Return a page navigation.
+   * <ul>
+   *   <li>If the totalPages is 1, the page navigation will be empty</li>
+   *   <li>
+   *     If the totalPages is more than 1, it will create a page navigation including a group of
+   *     page numbers and a form to submit the page number.
+   *   </li>
+   * </ul>
+   *
+   * Here are some examples of the page navigation:
+   * {{{
+   * << < 11 12 13* 14 15 16 17 18 19 20 > >>
+   *
+   * This is the first group, so "<<" is hidden.
+   * < 1 2* 3 4 5 6 7 8 9 10 > >>
+   *
+   * This is the first group and the first page, so "<<" and "<" are hidden.
+   * 1* 2 3 4 5 6 7 8 9 10 > >>
+   *
+   * Assume totalPages is 19. This is the last group, so ">>" is hidden.
+   * << < 11 12 13* 14 15 16 17 18 19 >
+   *
+   * Assume totalPages is 19. This is the last group and the last page, so ">>" and ">" are hidden.
+   * << < 11 12 13 14 15 16 17 18 19*
+   *
+   * * means the current page number
+   * << means jumping to the first page of the previous group.
+   * < means jumping to the previous page.
+   * >> means jumping to the first page of the next group.
+   * > means jumping to the next page.
+   * }}}
+   */
+  private[ui] def pageNavigation(page: Int, pageSize: Int, totalPages: Int): Seq[Node] = {
+    if (totalPages == 1) {
+      Nil
+    } else {
+      // A group includes all page numbers will be shown in the page navigation.
+      // The size of group is 10 means there are 10 page numbers will be shown.
+      // The first group is 1 to 10, the second is 2 to 20, and so on
+      val groupSize = 10
+      val firstGroup = 0
+      val lastGroup = (totalPages - 1) / groupSize
+      val currentGroup = (page - 1) / groupSize
+      val startPage = currentGroup * groupSize + 1
+      val endPage = totalPages.min(startPage + groupSize - 1)
+      val pageTags = (startPage to endPage).map { p =>
+        if (p == page) {
+          // The current page should be disabled so that it cannot be clicked.
+          <li class="disabled"><a href="#">{p}</a></li>
+        } else {
+          <li><a href={pageLink(p)}>{p}</a></li>
+        }
+      }
+      val (goButtonJsFuncName, goButtonJsFunc) = goButtonJavascriptFunction
+      // When clicking the "Go" button, it will call this javascript method and then call
+      // "goButtonJsFuncName"
+      val formJs =
+        s"""$$(function(){
+          |  $$( "#form-task-page" ).submit(function(event) {
+          |    var page = $$("#form-task-page-no").val()
+          |    var pageSize = $$("#form-task-page-size").val()
+          |    pageSize = pageSize ? pageSize: 100;
+          |    if (page != "") {
+          |      ${goButtonJsFuncName}(page, pageSize);
+          |    }
+          |    event.preventDefault();
+          |  });
+          |});
+        """.stripMargin
+
+      <div>
+        <div>
+          <form id="form-task-page" class="form-inline pull-right" style="margin-bottom: 0px;">
+            <label>{totalPages} Pages. Jump to</label>
+            <input type="text" id="form-task-page-no" value={page.toString} class="span1" />
+            <label>. Show </label>
+            <input type="text" id="form-task-page-size" value={pageSize.toString} class="span1" />
+            <label>tasks in a page.</label>
+            <button type="submit" class="btn">Go</button>
+          </form>
+        </div>
+        <div class="pagination" style="margin-bottom: 0px;">
+          <span style="float: left; padding-top: 4px; padding-right: 4px;">Page: </span>
+          <ul>
+            {if (currentGroup > firstGroup) {
+            <li>
+              <a href={pageLink(startPage - groupSize)} aria-label="Previous Group">
+                <span aria-hidden="true">
+                  &lt;&lt;
+                </span>
+              </a>
+            </li>
+            }}
+            {if (page > 1) {
+            <li>
+            <a href={pageLink(page - 1)} aria-label="Previous">
+              <span aria-hidden="true">
+                &lt;
+              </span>
+            </a>
+            </li>
+            }}
+            {pageTags}
+            {if (page < totalPages) {
+            <li>
+              <a href={pageLink(page + 1)} aria-label="Next">
+                <span aria-hidden="true">&gt;</span>
+              </a>
+            </li>
+            }}
+            {if (currentGroup < lastGroup) {
+            <li>
+              <a href={pageLink(startPage + groupSize)} aria-label="Next Group">
+                <span aria-hidden="true">
+                  &gt;&gt;
+                </span>
+              </a>
+            </li>
+          }}
+          </ul>
+        </div>
+        <script>
+          {Unparsed(goButtonJsFunc)}
+
+          {Unparsed(formJs)}
+        </script>
+      </div>
+    }
+  }
+
+  /**
+   * Return a link to jump to a page.
+   */
+  def pageLink(page: Int): String
+
+  /**
+   * Only the implementation knows how to create the url with a page number and the page size, so we
+   * leave this one to the implementation. The implementation should create a JavaScript method that
+   * accepts a page number along with the page size and jumps to the page. The return value is this
+   * method name and its JavaScript codes.
+   */
+  def goButtonJavascriptFunction: (String, String)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6e077bf..cf04b5e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ui.jobs
 
+import java.net.URLEncoder
 import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
@@ -27,13 +28,14 @@ import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
-import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
+import org.apache.spark.ui._
 import org.apache.spark.ui.jobs.UIData._
-import org.apache.spark.ui.scope.RDDOperationGraph
 import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
+  import StagePage._
+
   private val progressListener = parent.progressListener
   private val operationGraphListener = parent.operationGraphListener
 
@@ -74,6 +76,16 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
       val parameterAttempt = request.getParameter("attempt")
       require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
 
+      val parameterTaskPage = request.getParameter("task.page")
+      val parameterTaskSortColumn = request.getParameter("task.sort")
+      val parameterTaskSortDesc = request.getParameter("task.desc")
+      val parameterTaskPageSize = request.getParameter("task.pageSize")
+
+      val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1)
+      val taskSortColumn = Option(parameterTaskSortColumn).getOrElse("Index")
+      val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false)
+      val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100)
+
       // If this is set, expand the dag visualization by default
       val expandDagVizParam = request.getParameter("expandDagViz")
       val expandDagViz = expandDagVizParam != null && expandDagVizParam.toBoolean
@@ -231,52 +243,47 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
         accumulableRow,
         accumulables.values.toSeq)
 
-      val taskHeadersAndCssClasses: Seq[(String, String)] =
-        Seq(
-          ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
-          ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
-          ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
-          ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
-          ("GC Time", ""),
-          ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
-          ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
-        {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
-        {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++
-        {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
-        {if (stageData.hasShuffleRead) {
-          Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
-            ("Shuffle Read Size / Records", ""),
-            ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
-        } else {
-          Nil
-        }} ++
-        {if (stageData.hasShuffleWrite) {
-          Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
-        } else {
-          Nil
-        }} ++
-        {if (stageData.hasBytesSpilled) {
-          Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
-        } else {
-          Nil
-        }} ++
-        Seq(("Errors", ""))
-
-      val unzipped = taskHeadersAndCssClasses.unzip
-
       val currentTime = System.currentTimeMillis()
-      val taskTable = UIUtils.listingTable(
-        unzipped._1,
-        taskRow(
+      val (taskTable, taskTableHTML) = try {
+        val _taskTable = new TaskPagedTable(
+          UIUtils.prependBaseUri(parent.basePath) +
+            s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
+          tasks,
           hasAccumulators,
           stageData.hasInput,
           stageData.hasOutput,
           stageData.hasShuffleRead,
           stageData.hasShuffleWrite,
           stageData.hasBytesSpilled,
-          currentTime),
-        tasks,
-        headerClasses = unzipped._2)
+          currentTime,
+          pageSize = taskPageSize,
+          sortColumn = taskSortColumn,
+          desc = taskSortDesc
+        )
+        (_taskTable, _taskTable.table(taskPage))
+      } catch {
+        case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+          (null, <div class="alert alert-error">{e.getMessage}</div>)
+      }
+
+      val jsForScrollingDownToTaskTable =
+        <script>
+          {Unparsed {
+            """
+              |$(function() {
+              |  if (/.*&task.sort=.*$/.test(location.search)) {
+              |    var topOffset = $("#tasks-section").offset().top;
+              |    $("html,body").animate({scrollTop: topOffset}, 200);
+              |  }
+              |});
+            """.stripMargin
+           }
+          }
+        </script>
+
+      val taskIdsInPage = if (taskTable == null) Set.empty[Long]
+        else taskTable.dataSource.slicedTaskIds
+
       // Excludes tasks which failed and have incomplete metrics
       val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
 
@@ -499,12 +506,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
         dagViz ++
         maybeExpandDagViz ++
         showAdditionalMetrics ++
-        makeTimeline(stageData.taskData.values.toSeq, currentTime) ++
+        makeTimeline(
+          // Only show the tasks in the table
+          stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)),
+          currentTime) ++
         <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
         <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
         <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
         maybeAccumulableTable ++
-        <h4>Tasks</h4> ++ taskTable
+        <h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
       UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
     }
   }
@@ -679,164 +689,619 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
     </script>
   }
 
-  def taskRow(
-      hasAccumulators: Boolean,
-      hasInput: Boolean,
-      hasOutput: Boolean,
-      hasShuffleRead: Boolean,
-      hasShuffleWrite: Boolean,
-      hasBytesSpilled: Boolean,
-      currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
-    taskData match { case TaskUIData(info, metrics, errorMessage) =>
-      val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
-        else metrics.map(_.executorRunTime).getOrElse(1L)
-      val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
-        else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
-      val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
-      val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
-      val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
-      val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
-      val gettingResultTime = getGettingResultTime(info, currentTime)
-
-      val maybeAccumulators = info.accumulables
-      val accumulatorsReadable = maybeAccumulators.map { acc =>
-        StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+}
+
+private[ui] object StagePage {
+  private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
+    if (info.gettingResult) {
+      if (info.finished) {
+        info.finishTime - info.gettingResultTime
+      } else {
+        // The task is still fetching the result.
+        currentTime - info.gettingResultTime
       }
+    } else {
+      0L
+    }
+  }
 
-      val maybeInput = metrics.flatMap(_.inputMetrics)
-      val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
-      val inputReadable = maybeInput
-        .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
-        .getOrElse("")
-      val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
-
-      val maybeOutput = metrics.flatMap(_.outputMetrics)
-      val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
-      val outputReadable = maybeOutput
-        .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
-        .getOrElse("")
-      val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
-
-      val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
-      val shuffleReadBlockedTimeSortable = maybeShuffleRead
-        .map(_.fetchWaitTime.toString).getOrElse("")
-      val shuffleReadBlockedTimeReadable =
-        maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
-
-      val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
-      val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
-      val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
-      val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
-
-      val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
-      val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
-      val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
-
-      val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
-      val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
-      val shuffleWriteReadable = maybeShuffleWrite
-        .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
-      val shuffleWriteRecords = maybeShuffleWrite
-        .map(_.shuffleRecordsWritten.toString).getOrElse("")
-
-      val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
-      val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
-      val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
-        if (ms == 0) "" else UIUtils.formatDuration(ms)
-      }.getOrElse("")
-
-      val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
-      val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
-      val memoryBytesSpilledReadable =
-        maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
-      val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
-      val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
-      val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
-      <tr>
-        <td>{info.index}</td>
-        <td>{info.taskId}</td>
-        <td sorttable_customkey={info.attempt.toString}>{
-          if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
-        }</td>
-        <td>{info.status}</td>
-        <td>{info.taskLocality}</td>
-        <td>{info.executorId} / {info.host}</td>
-        <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
-        <td sorttable_customkey={duration.toString}>
-          {formatDuration}
-        </td>
-        <td sorttable_customkey={schedulerDelay.toString}
-            class={TaskDetailsClassNames.SCHEDULER_DELAY}>
-          {UIUtils.formatDuration(schedulerDelay.toLong)}
-        </td>
-        <td sorttable_customkey={taskDeserializationTime.toString}
-            class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
-          {UIUtils.formatDuration(taskDeserializationTime.toLong)}
-        </td>
-        <td sorttable_customkey={gcTime.toString}>
-          {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
-        </td>
-        <td sorttable_customkey={serializationTime.toString}
-            class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
-          {UIUtils.formatDuration(serializationTime)}
-        </td>
-        <td sorttable_customkey={gettingResultTime.toString}
-            class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
-          {UIUtils.formatDuration(gettingResultTime)}
-        </td>
-        {if (hasAccumulators) {
-          <td>
-            {Unparsed(accumulatorsReadable.mkString("<br/>"))}
-          </td>
-        }}
-        {if (hasInput) {
-          <td sorttable_customkey={inputSortable}>
-            {s"$inputReadable / $inputRecords"}
-          </td>
-        }}
-        {if (hasOutput) {
-          <td sorttable_customkey={outputSortable}>
-            {s"$outputReadable / $outputRecords"}
-          </td>
-        }}
+  private[ui] def getSchedulerDelay(
+      info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+    if (info.finished) {
+      val totalExecutionTime = info.finishTime - info.launchTime
+      val executorOverhead = (metrics.executorDeserializeTime +
+        metrics.resultSerializationTime)
+      math.max(
+        0,
+        totalExecutionTime - metrics.executorRunTime - executorOverhead -
+          getGettingResultTime(info, currentTime))
+    } else {
+      // The task is still running and the metrics like executorRunTime are not available.
+      0L
+    }
+  }
+}
+
+private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String)
+
+private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String)
+
+private[ui] case class TaskTableRowShuffleReadData(
+    shuffleReadBlockedTimeSortable: Long,
+    shuffleReadBlockedTimeReadable: String,
+    shuffleReadSortable: Long,
+    shuffleReadReadable: String,
+    shuffleReadRemoteSortable: Long,
+    shuffleReadRemoteReadable: String)
+
+private[ui] case class TaskTableRowShuffleWriteData(
+    writeTimeSortable: Long,
+    writeTimeReadable: String,
+    shuffleWriteSortable: Long,
+    shuffleWriteReadable: String)
+
+private[ui] case class TaskTableRowBytesSpilledData(
+    memoryBytesSpilledSortable: Long,
+    memoryBytesSpilledReadable: String,
+    diskBytesSpilledSortable: Long,
+    diskBytesSpilledReadable: String)
+
+/**
+ * Contains all data that needs for sorting and generating HTML. Using this one rather than
+ * TaskUIData to avoid creating duplicate contents during sorting the data.
+ */
+private[ui] case class TaskTableRowData(
+    index: Int,
+    taskId: Long,
+    attempt: Int,
+    speculative: Boolean,
+    status: String,
+    taskLocality: String,
+    executorIdAndHost: String,
+    launchTime: Long,
+    duration: Long,
+    formatDuration: String,
+    schedulerDelay: Long,
+    taskDeserializationTime: Long,
+    gcTime: Long,
+    serializationTime: Long,
+    gettingResultTime: Long,
+    accumulators: Option[String], // HTML
+    input: Option[TaskTableRowInputData],
+    output: Option[TaskTableRowOutputData],
+    shuffleRead: Option[TaskTableRowShuffleReadData],
+    shuffleWrite: Option[TaskTableRowShuffleWriteData],
+    bytesSpilled: Option[TaskTableRowBytesSpilledData],
+    error: String)
+
+private[ui] class TaskDataSource(
+    tasks: Seq[TaskUIData],
+    hasAccumulators: Boolean,
+    hasInput: Boolean,
+    hasOutput: Boolean,
+    hasShuffleRead: Boolean,
+    hasShuffleWrite: Boolean,
+    hasBytesSpilled: Boolean,
+    currentTime: Long,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) {
+  import StagePage._
+
+  // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
+  // so that we can avoid creating duplicate contents during sorting the data
+  private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
+
+  private var _slicedTaskIds: Set[Long] = null
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = {
+    val r = data.slice(from, to)
+    _slicedTaskIds = r.map(_.taskId).toSet
+    r
+  }
+
+  def slicedTaskIds: Set[Long] = _slicedTaskIds
+
+  private def taskRow(taskData: TaskUIData): TaskTableRowData = {
+    val TaskUIData(info, metrics, errorMessage) = taskData
+    val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
+      else metrics.map(_.executorRunTime).getOrElse(1L)
+    val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+      else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
+    val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
+    val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
+    val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
+    val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
+    val gettingResultTime = getGettingResultTime(info, currentTime)
+
+    val maybeAccumulators = info.accumulables
+    val accumulatorsReadable = maybeAccumulators.map { acc =>
+      StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+    }
+
+    val maybeInput = metrics.flatMap(_.inputMetrics)
+    val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
+    val inputReadable = maybeInput
+      .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+      .getOrElse("")
+    val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
+
+    val maybeOutput = metrics.flatMap(_.outputMetrics)
+    val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
+    val outputReadable = maybeOutput
+      .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
+      .getOrElse("")
+    val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
+
+    val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+    val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
+    val shuffleReadBlockedTimeReadable =
+      maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
+
+    val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+    val shuffleReadSortable = totalShuffleBytes.getOrElse(0L)
+    val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
+    val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
+
+    val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+    val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
+    val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
+    val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+    val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
+    val shuffleWriteReadable = maybeShuffleWrite
+      .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+    val shuffleWriteRecords = maybeShuffleWrite
+      .map(_.shuffleRecordsWritten.toString).getOrElse("")
+
+    val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
+    val writeTimeSortable = maybeWriteTime.getOrElse(0L)
+    val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+      if (ms == 0) "" else UIUtils.formatDuration(ms)
+    }.getOrElse("")
+
+    val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
+    val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L)
+    val memoryBytesSpilledReadable =
+      maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+    val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
+    val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L)
+    val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+    val input =
+      if (hasInput) {
+        Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords"))
+      } else {
+        None
+      }
+
+    val output =
+      if (hasOutput) {
+        Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords"))
+      } else {
+        None
+      }
+
+    val shuffleRead =
+      if (hasShuffleRead) {
+        Some(TaskTableRowShuffleReadData(
+          shuffleReadBlockedTimeSortable,
+          shuffleReadBlockedTimeReadable,
+          shuffleReadSortable,
+          s"$shuffleReadReadable / $shuffleReadRecords",
+          shuffleReadRemoteSortable,
+          shuffleReadRemoteReadable
+        ))
+      } else {
+        None
+      }
+
+    val shuffleWrite =
+      if (hasShuffleWrite) {
+        Some(TaskTableRowShuffleWriteData(
+          writeTimeSortable,
+          writeTimeReadable,
+          shuffleWriteSortable,
+          s"$shuffleWriteReadable / $shuffleWriteRecords"
+        ))
+      } else {
+        None
+      }
+
+    val bytesSpilled =
+      if (hasBytesSpilled) {
+        Some(TaskTableRowBytesSpilledData(
+          memoryBytesSpilledSortable,
+          memoryBytesSpilledReadable,
+          diskBytesSpilledSortable,
+          diskBytesSpilledReadable
+        ))
+      } else {
+        None
+      }
+
+    TaskTableRowData(
+      info.index,
+      info.taskId,
+      info.attempt,
+      info.speculative,
+      info.status,
+      info.taskLocality.toString,
+      s"${info.executorId} / ${info.host}",
+      info.launchTime,
+      duration,
+      formatDuration,
+      schedulerDelay,
+      taskDeserializationTime,
+      gcTime,
+      serializationTime,
+      gettingResultTime,
+      if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>")) else None,
+      input,
+      output,
+      shuffleRead,
+      shuffleWrite,
+      bytesSpilled,
+      errorMessage.getOrElse("")
+    )
+  }
+
+  /**
+   * Return Ordering according to sortColumn and desc
+   */
+  private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
+    val ordering = sortColumn match {
+      case "Index" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Int.compare(x.index, y.index)
+      }
+      case "ID" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.taskId, y.taskId)
+      }
+      case "Attempt" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Int.compare(x.attempt, y.attempt)
+      }
+      case "Status" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.String.compare(x.status, y.status)
+      }
+      case "Locality Level" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.String.compare(x.taskLocality, y.taskLocality)
+      }
+      case "Executor ID / Host" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
+      }
+      case "Launch Time" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.launchTime, y.launchTime)
+      }
+      case "Duration" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.duration, y.duration)
+      }
+      case "Scheduler Delay" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
+      }
+      case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime)
+      }
+      case "GC Time" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.gcTime, y.gcTime)
+      }
+      case "Result Serialization Time" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.serializationTime, y.serializationTime)
+      }
+      case "Getting Result Time" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
+      }
+      case "Accumulators" =>
+        if (hasAccumulators) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.String.compare(x.accumulators.get, y.accumulators.get)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Accumulators because of no accumulators")
+        }
+      case "Input Size / Records" =>
+        if (hasInput) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Input Size / Records because of no inputs")
+        }
+      case "Output Size / Records" =>
+        if (hasOutput) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Output Size / Records because of no outputs")
+        }
+      // ShuffleRead
+      case "Shuffle Read Blocked Time" =>
+        if (hasShuffleRead) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable,
+                y.shuffleRead.get.shuffleReadBlockedTimeSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
+        }
+      case "Shuffle Read Size / Records" =>
+        if (hasShuffleRead) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable,
+                y.shuffleRead.get.shuffleReadSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
+        }
+      case "Shuffle Remote Reads" =>
+        if (hasShuffleRead) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable,
+                y.shuffleRead.get.shuffleReadRemoteSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
+        }
+      // ShuffleWrite
+      case "Write Time" =>
+        if (hasShuffleWrite) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable,
+                y.shuffleWrite.get.writeTimeSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Write Time because of no shuffle writes")
+        }
+      case "Shuffle Write Size / Records" =>
+        if (hasShuffleWrite) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable,
+                y.shuffleWrite.get.shuffleWriteSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
+        }
+      // BytesSpilled
+      case "Shuffle Spill (Memory)" =>
+        if (hasBytesSpilled) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable,
+                y.bytesSpilled.get.memoryBytesSpilledSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Spill (Memory) because of no spills")
+        }
+      case "Shuffle Spill (Disk)" =>
+        if (hasBytesSpilled) {
+          new Ordering[TaskTableRowData] {
+            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+              Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable,
+                y.bytesSpilled.get.diskBytesSpilledSortable)
+          }
+        } else {
+          throw new IllegalArgumentException(
+            "Cannot sort by Shuffle Spill (Disk) because of no spills")
+        }
+      case "Errors" => new Ordering[TaskTableRowData] {
+        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+          Ordering.String.compare(x.error, y.error)
+      }
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+
+}
+
+private[ui] class TaskPagedTable(
+    basePath: String,
+    data: Seq[TaskUIData],
+    hasAccumulators: Boolean,
+    hasInput: Boolean,
+    hasOutput: Boolean,
+    hasShuffleRead: Boolean,
+    hasShuffleWrite: Boolean,
+    hasBytesSpilled: Boolean,
+    currentTime: Long,
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedTable[TaskTableRowData]{
+
+  override def tableId: String = ""
+
+  override def tableCssClass: String = "table table-bordered table-condensed table-striped"
+
+  override val dataSource: TaskDataSource = new TaskDataSource(
+    data,
+    hasAccumulators,
+    hasInput,
+    hasOutput,
+    hasShuffleRead,
+    hasShuffleWrite,
+    hasBytesSpilled,
+    currentTime,
+    pageSize,
+    sortColumn,
+    desc
+  )
+
+  override def pageLink(page: Int): String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    s"${basePath}&task.page=$page&task.sort=${encodedSortColumn}&task.desc=${desc}" +
+      s"&task.pageSize=${pageSize}"
+  }
+
+  override def goButtonJavascriptFunction: (String, String) = {
+    val jsFuncName = "goToTaskPage"
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    val jsFunc = s"""
+      |currentTaskPageSize = ${pageSize}
+      |function goToTaskPage(page, pageSize) {
+      |  // Set page to 1 if the page size changes
+      |  page = pageSize == currentTaskPageSize ? page : 1;
+      |  var url = "${basePath}&task.sort=${encodedSortColumn}&task.desc=${desc}" +
+      |    "&task.page=" + page + "&task.pageSize=" + pageSize;
+      |  window.location.href = url;
+      |}
+     """.stripMargin
+    (jsFuncName, jsFunc)
+  }
+
+  def headers: Seq[Node] = {
+    val taskHeadersAndCssClasses: Seq[(String, String)] =
+      Seq(
+        ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
+        ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
+        ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
+        ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
+        ("GC Time", ""),
+        ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
+        ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
+        {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
+        {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
+        {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
         {if (hasShuffleRead) {
-           <td sorttable_customkey={shuffleReadBlockedTimeSortable}
-             class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
-             {shuffleReadBlockedTimeReadable}
-           </td>
-           <td sorttable_customkey={shuffleReadSortable}>
-             {s"$shuffleReadReadable / $shuffleReadRecords"}
-           </td>
-           <td sorttable_customkey={shuffleReadRemoteSortable}
-               class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
-             {shuffleReadRemoteReadable}
-           </td>
-        }}
+          Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
+            ("Shuffle Read Size / Records", ""),
+            ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
+        } else {
+          Nil
+        }} ++
         {if (hasShuffleWrite) {
-           <td sorttable_customkey={writeTimeSortable}>
-             {writeTimeReadable}
-           </td>
-           <td sorttable_customkey={shuffleWriteSortable}>
-             {s"$shuffleWriteReadable / $shuffleWriteRecords"}
-           </td>
-        }}
+          Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+        } else {
+          Nil
+        }} ++
         {if (hasBytesSpilled) {
-          <td sorttable_customkey={memoryBytesSpilledSortable}>
-            {memoryBytesSpilledReadable}
-          </td>
-          <td sorttable_customkey={diskBytesSpilledSortable}>
-            {diskBytesSpilledReadable}
-          </td>
-        }}
-        {errorMessageCell(errorMessage)}
-      </tr>
+          Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
+        } else {
+          Nil
+        }} ++
+        Seq(("Errors", ""))
+
+    if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
+      new IllegalArgumentException(s"Unknown column: $sortColumn")
     }
+
+    val headerRow: Seq[Node] = {
+      taskHeadersAndCssClasses.map { case (header, cssClass) =>
+        if (header == sortColumn) {
+          val headerLink =
+            s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.desc=${!desc}" +
+              s"&task.pageSize=${pageSize}"
+          val js = Unparsed(s"window.location.href='${headerLink}'")
+          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+          <th class={cssClass} onclick={js} style="cursor: pointer;">
+            {header}
+            <span>&nbsp;{Unparsed(arrow)}</span>
+          </th>
+        } else {
+          val headerLink =
+            s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.pageSize=${pageSize}"
+          val js = Unparsed(s"window.location.href='${headerLink}'")
+          <th class={cssClass} onclick={js} style="cursor: pointer;">
+            {header}
+          </th>
+        }
+      }
+    }
+    <thead>{headerRow}</thead>
+  }
+
+  def row(task: TaskTableRowData): Seq[Node] = {
+    <tr>
+      <td>{task.index}</td>
+      <td>{task.taskId}</td>
+      <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
+      <td>{task.status}</td>
+      <td>{task.taskLocality}</td>
+      <td>{task.executorIdAndHost}</td>
+      <td>{UIUtils.formatDate(new Date(task.launchTime))}</td>
+      <td>{task.formatDuration}</td>
+      <td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
+        {UIUtils.formatDuration(task.schedulerDelay)}
+      </td>
+      <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
+        {UIUtils.formatDuration(task.taskDeserializationTime)}
+      </td>
+      <td>
+        {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""}
+      </td>
+      <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
+        {UIUtils.formatDuration(task.serializationTime)}
+      </td>
+      <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
+        {UIUtils.formatDuration(task.gettingResultTime)}
+      </td>
+      {if (task.accumulators.nonEmpty) {
+        <td>{Unparsed(task.accumulators.get)}</td>
+      }}
+      {if (task.input.nonEmpty) {
+        <td>{task.input.get.inputReadable}</td>
+      }}
+      {if (task.output.nonEmpty) {
+        <td>{task.output.get.outputReadable}</td>
+      }}
+      {if (task.shuffleRead.nonEmpty) {
+        <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
+          {task.shuffleRead.get.shuffleReadBlockedTimeReadable}
+        </td>
+        <td>{task.shuffleRead.get.shuffleReadReadable}</td>
+        <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+          {task.shuffleRead.get.shuffleReadRemoteReadable}
+        </td>
+      }}
+      {if (task.shuffleWrite.nonEmpty) {
+        <td>{task.shuffleWrite.get.writeTimeReadable}</td>
+        <td>{task.shuffleWrite.get.shuffleWriteReadable}</td>
+      }}
+      {if (task.bytesSpilled.nonEmpty) {
+        <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td>
+        <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td>
+      }}
+      {errorMessageCell(task.error)}
+    </tr>
   }
 
-  private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
-    val error = errorMessage.getOrElse("")
+  private def errorMessageCell(error: String): Seq[Node] = {
     val isMultiline = error.indexOf('\n') >= 0
     // Display the first line by default
     val errorSummary = StringEscapeUtils.escapeHtml4(
@@ -860,32 +1325,4 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
     }
     <td>{errorSummary}{details}</td>
   }
-
-  private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
-    if (info.gettingResult) {
-      if (info.finished) {
-        info.finishTime - info.gettingResultTime
-      } else {
-        // The task is still fetching the result.
-        currentTime - info.gettingResultTime
-      }
-    } else {
-      0L
-    }
-  }
-
-  private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
-    if (info.finished) {
-      val totalExecutionTime = info.finishTime - info.launchTime
-      val executorOverhead = (metrics.executorDeserializeTime +
-        metrics.resultSerializationTime)
-      math.max(
-        0,
-        totalExecutionTime - metrics.executorRunTime - executorOverhead -
-          getGettingResultTime(info, currentTime))
-    } else {
-      // The task is still running and the metrics like executorRunTime are not available.
-      0L
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1ee3/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala b/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala
new file mode 100644
index 0000000..cc76c14
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.xml.Node
+
+import org.apache.spark.SparkFunSuite
+
+class PagedDataSourceSuite extends SparkFunSuite {
+
+  test("basic") {
+    val dataSource1 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+    assert(dataSource1.pageData(1) === PageData(3, (1 to 2)))
+
+    val dataSource2 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+    assert(dataSource2.pageData(2) === PageData(3, (3 to 4)))
+
+    val dataSource3 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+    assert(dataSource3.pageData(3) === PageData(3, Seq(5)))
+
+    val dataSource4 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+    val e1 = intercept[IndexOutOfBoundsException] {
+      dataSource4.pageData(4)
+    }
+    assert(e1.getMessage === "Page 4 is out of range. Please select a page number between 1 and 3.")
+
+    val dataSource5 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+    val e2 = intercept[IndexOutOfBoundsException] {
+      dataSource5.pageData(0)
+    }
+    assert(e2.getMessage === "Page 0 is out of range. Please select a page number between 1 and 3.")
+
+  }
+}
+
+class PagedTableSuite extends SparkFunSuite {
+  test("pageNavigation") {
+    // Create a fake PagedTable to test pageNavigation
+    val pagedTable = new PagedTable[Int] {
+      override def tableId: String = ""
+
+      override def tableCssClass: String = ""
+
+      override def dataSource: PagedDataSource[Int] = null
+
+      override def pageLink(page: Int): String = page.toString
+
+      override def headers: Seq[Node] = Nil
+
+      override def row(t: Int): Seq[Node] = Nil
+
+      override def goButtonJavascriptFunction: (String, String) = ("", "")
+    }
+
+    assert(pagedTable.pageNavigation(1, 10, 1) === Nil)
+    assert(
+      (pagedTable.pageNavigation(1, 10, 2).head \\ "li").map(_.text.trim) === Seq("1", "2", ">"))
+    assert(
+      (pagedTable.pageNavigation(2, 10, 2).head \\ "li").map(_.text.trim) === Seq("<", "1", "2"))
+
+    assert((pagedTable.pageNavigation(1, 10, 100).head \\ "li").map(_.text.trim) ===
+      (1 to 10).map(_.toString) ++ Seq(">", ">>"))
+    assert((pagedTable.pageNavigation(2, 10, 100).head \\ "li").map(_.text.trim) ===
+      Seq("<") ++ (1 to 10).map(_.toString) ++ Seq(">", ">>"))
+
+    assert((pagedTable.pageNavigation(100, 10, 100).head \\ "li").map(_.text.trim) ===
+      Seq("<<", "<") ++ (91 to 100).map(_.toString))
+    assert((pagedTable.pageNavigation(99, 10, 100).head \\ "li").map(_.text.trim) ===
+      Seq("<<", "<") ++ (91 to 100).map(_.toString) ++ Seq(">"))
+
+    assert((pagedTable.pageNavigation(11, 10, 100).head \\ "li").map(_.text.trim) ===
+      Seq("<<", "<") ++ (11 to 20).map(_.toString) ++ Seq(">", ">>"))
+    assert((pagedTable.pageNavigation(93, 10, 97).head \\ "li").map(_.text.trim) ===
+      Seq("<<", "<") ++ (91 to 97).map(_.toString) ++ Seq(">"))
+  }
+}
+
+private[spark] class SeqPagedDataSource[T](seq: Seq[T], pageSize: Int)
+  extends PagedDataSource[T](pageSize) {
+
+  override protected def dataSize: Int = seq.size
+
+  override protected def sliceData(from: Int, to: Int): Seq[T] = seq.slice(from, to)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org