You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/10/08 10:24:07 UTC

spark git commit: [SPARK-17793][WEB UI] Sorting on the description on the Job or Stage page doesn’t always work

Repository: spark
Updated Branches:
  refs/heads/master 471690f90 -> 362ba4b6f


[SPARK-17793][WEB UI] Sorting on the description on the Job or Stage page doesn\u2019t always work

## What changes were proposed in this pull request?

Added secondary sorting on stage name for the description column. This provide a clearer behavior in the common case where the Description column only comprises of Stage names instead of the option description value.

## How was this patch tested?

manual testing and dev/run-tests

Screenshots of sorting on both description and stage name as well as an example of both:
![screen shot 2016-10-04 at 1 09 39 pm](https://cloud.githubusercontent.com/assets/13952758/19135523/067b042e-8b1a-11e6-912e-e6371d006d21.png)
![screen shot 2016-10-04 at 1 09 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135526/06960936-8b1a-11e6-85e9-8aaf694c5f7b.png)
![screen shot 2016-10-05 at 1 14 45 pm](https://cloud.githubusercontent.com/assets/13952758/19135525/069547da-8b1a-11e6-8692-6524c75c4c07.png)
![screen shot 2016-10-05 at 1 14 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135524/0694b4d2-8b1a-11e6-92dc-c8aa514e4f62.png)
![screen shot 2016-10-05 at 4 42 52 pm](https://cloud.githubusercontent.com/assets/13952758/19135618/e232eafe-8b1a-11e6-88b3-ff0bbb26b7f8.png)

Author: Alex Bozarth <aj...@us.ibm.com>

Closes #15366 from ajbozarth/spark17793.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/362ba4b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/362ba4b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/362ba4b6

Branch: refs/heads/master
Commit: 362ba4b6f8e8fc2355368742c5adced7573fec00
Parents: 471690f
Author: Alex Bozarth <aj...@us.ibm.com>
Authored: Sat Oct 8 11:24:00 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Oct 8 11:24:00 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  |  25 +---
 .../org/apache/spark/ui/jobs/StagePage.scala    | 134 ++++---------------
 .../org/apache/spark/ui/jobs/StageTable.scala   |  51 ++-----
 .../org/apache/spark/ui/storage/RDDPage.scala   |  27 +---
 4 files changed, 49 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/362ba4b6/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 19bb41a..f671309 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -457,23 +457,11 @@ private[ui] class JobDataSource(
    * Return Ordering according to sortColumn and desc
    */
   private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = {
-    val ordering = sortColumn match {
-      case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
-        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
-          Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
-      }
-      case "Description" => new Ordering[JobTableRowData] {
-        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
-          Ordering.String.compare(x.lastStageDescription, y.lastStageDescription)
-      }
-      case "Submitted" => new Ordering[JobTableRowData] {
-        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
-          Ordering.Long.compare(x.submissionTime, y.submissionTime)
-      }
-      case "Duration" => new Ordering[JobTableRowData] {
-        override def compare(x: JobTableRowData, y: JobTableRowData): Int =
-          Ordering.Long.compare(x.duration, y.duration)
-      }
+    val ordering: Ordering[JobTableRowData] = sortColumn match {
+      case "Job Id" | "Job Id (Job Group)" => Ordering.by(_.jobData.jobId)
+      case "Description" => Ordering.by(x => (x.lastStageDescription, x.lastStageName))
+      case "Submitted" => Ordering.by(_.submissionTime)
+      case "Duration" => Ordering.by(_.duration)
       case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" =>
         throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
       case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
@@ -501,8 +489,7 @@ private[ui] class JobPagedTable(
     sortColumn: String,
     desc: Boolean
   ) extends PagedTable[JobTableRowData] {
-  val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
-    parameterOtherTable.mkString("&")
+  val parameterPath = basePath + s"/$subPath/?" + parameterOtherTable.mkString("&")
 
   override def tableId: String = jobTag + "-table"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/362ba4b6/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c322ae0..8c7cefe 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -1050,89 +1050,38 @@ private[ui] class TaskDataSource(
    * Return Ordering according to sortColumn and desc
    */
   private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
-    val ordering = sortColumn match {
-      case "Index" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Int.compare(x.index, y.index)
-      }
-      case "ID" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.taskId, y.taskId)
-      }
-      case "Attempt" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Int.compare(x.attempt, y.attempt)
-      }
-      case "Status" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.String.compare(x.status, y.status)
-      }
-      case "Locality Level" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.String.compare(x.taskLocality, y.taskLocality)
-      }
-      case "Executor ID / Host" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
-      }
-      case "Launch Time" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.launchTime, y.launchTime)
-      }
-      case "Duration" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.duration, y.duration)
-      }
-      case "Scheduler Delay" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
-      }
-      case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime)
-      }
-      case "GC Time" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.gcTime, y.gcTime)
-      }
-      case "Result Serialization Time" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.serializationTime, y.serializationTime)
-      }
-      case "Getting Result Time" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
-      }
-      case "Peak Execution Memory" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.Long.compare(x.peakExecutionMemoryUsed, y.peakExecutionMemoryUsed)
-      }
+    val ordering: Ordering[TaskTableRowData] = sortColumn match {
+      case "Index" => Ordering.by(_.index)
+      case "ID" => Ordering.by(_.taskId)
+      case "Attempt" => Ordering.by(_.attempt)
+      case "Status" => Ordering.by(_.status)
+      case "Locality Level" => Ordering.by(_.taskLocality)
+      case "Executor ID / Host" => Ordering.by(_.executorIdAndHost)
+      case "Launch Time" => Ordering.by(_.launchTime)
+      case "Duration" => Ordering.by(_.duration)
+      case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
+      case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime)
+      case "GC Time" => Ordering.by(_.gcTime)
+      case "Result Serialization Time" => Ordering.by(_.serializationTime)
+      case "Getting Result Time" => Ordering.by(_.gettingResultTime)
+      case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed)
       case "Accumulators" =>
         if (hasAccumulators) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.String.compare(x.accumulators.get, y.accumulators.get)
-          }
+          Ordering.by(_.accumulators.get)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Accumulators because of no accumulators")
         }
       case "Input Size / Records" =>
         if (hasInput) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable)
-          }
+          Ordering.by(_.input.get.inputSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Input Size / Records because of no inputs")
         }
       case "Output Size / Records" =>
         if (hasOutput) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable)
-          }
+          Ordering.by(_.output.get.outputSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Output Size / Records because of no outputs")
@@ -1140,33 +1089,21 @@ private[ui] class TaskDataSource(
       // ShuffleRead
       case "Shuffle Read Blocked Time" =>
         if (hasShuffleRead) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable,
-                y.shuffleRead.get.shuffleReadBlockedTimeSortable)
-          }
+          Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
         }
       case "Shuffle Read Size / Records" =>
         if (hasShuffleRead) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable,
-                y.shuffleRead.get.shuffleReadSortable)
-          }
+          Ordering.by(_.shuffleRead.get.shuffleReadSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
         }
       case "Shuffle Remote Reads" =>
         if (hasShuffleRead) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable,
-                y.shuffleRead.get.shuffleReadRemoteSortable)
-          }
+          Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
@@ -1174,22 +1111,14 @@ private[ui] class TaskDataSource(
       // ShuffleWrite
       case "Write Time" =>
         if (hasShuffleWrite) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable,
-                y.shuffleWrite.get.writeTimeSortable)
-          }
+          Ordering.by(_.shuffleWrite.get.writeTimeSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Write Time because of no shuffle writes")
         }
       case "Shuffle Write Size / Records" =>
         if (hasShuffleWrite) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable,
-                y.shuffleWrite.get.shuffleWriteSortable)
-          }
+          Ordering.by(_.shuffleWrite.get.shuffleWriteSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
@@ -1197,30 +1126,19 @@ private[ui] class TaskDataSource(
       // BytesSpilled
       case "Shuffle Spill (Memory)" =>
         if (hasBytesSpilled) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable,
-                y.bytesSpilled.get.memoryBytesSpilledSortable)
-          }
+          Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Spill (Memory) because of no spills")
         }
       case "Shuffle Spill (Disk)" =>
         if (hasBytesSpilled) {
-          new Ordering[TaskTableRowData] {
-            override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-              Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable,
-                y.bytesSpilled.get.diskBytesSpilledSortable)
-          }
+          Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable)
         } else {
           throw new IllegalArgumentException(
             "Cannot sort by Shuffle Spill (Disk) because of no spills")
         }
-      case "Errors" => new Ordering[TaskTableRowData] {
-        override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
-          Ordering.String.compare(x.error, y.error)
-      }
+      case "Errors" => Ordering.by(_.error)
       case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
     }
     if (desc) {

http://git-wip-us.apache.org/repos/asf/spark/blob/362ba4b6/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 40a6762..9b9b468 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -109,7 +109,6 @@ private[ui] class StageTableRowData(
     val stageId: Int,
     val attemptId: Int,
     val schedulingPool: String,
-    val description: String,
     val descriptionOption: Option[String],
     val submissionTime: Long,
     val formattedSubmissionTime: String,
@@ -128,7 +127,7 @@ private[ui] class MissingStageTableRowData(
     stageInfo: StageInfo,
     stageId: Int,
     attemptId: Int) extends StageTableRowData(
-  stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
+  stageInfo, None, stageId, attemptId, "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
 
 /** Page showing list of all ongoing and recently finished stages */
 private[ui] class StagePagedTable(
@@ -470,7 +469,6 @@ private[ui] class StageDataSource(
       s.stageId,
       s.attemptId,
       stageData.schedulingPool,
-      description.getOrElse(""),
       description,
       s.submissionTime.getOrElse(0),
       formattedSubmissionTime,
@@ -491,43 +489,16 @@ private[ui] class StageDataSource(
    * Return Ordering according to sortColumn and desc
    */
   private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = {
-    val ordering = sortColumn match {
-      case "Stage Id" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Int.compare(x.stageId, y.stageId)
-      }
-      case "Pool Name" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.String.compare(x.schedulingPool, y.schedulingPool)
-      }
-      case "Description" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.String.compare(x.description, y.description)
-      }
-      case "Submitted" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.submissionTime, y.submissionTime)
-      }
-      case "Duration" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.duration, y.duration)
-      }
-      case "Input" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.inputRead, y.inputRead)
-      }
-      case "Output" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.outputWrite, y.outputWrite)
-      }
-      case "Shuffle Read" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.shuffleRead, y.shuffleRead)
-      }
-      case "Shuffle Write" => new Ordering[StageTableRowData] {
-        override def compare(x: StageTableRowData, y: StageTableRowData): Int =
-          Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite)
-      }
+    val ordering: Ordering[StageTableRowData] = sortColumn match {
+      case "Stage Id" => Ordering.by(_.stageId)
+      case "Pool Name" => Ordering.by(_.schedulingPool)
+      case "Description" => Ordering.by(x => (x.descriptionOption, x.stageInfo.name))
+      case "Submitted" => Ordering.by(_.submissionTime)
+      case "Duration" => Ordering.by(_.duration)
+      case "Input" => Ordering.by(_.inputRead)
+      case "Output" => Ordering.by(_.outputWrite)
+      case "Shuffle Read" => Ordering.by(_.shuffleRead)
+      case "Shuffle Write" => Ordering.by(_.shuffleWrite)
       case "Tasks: Succeeded/Total" =>
         throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
       case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")

http://git-wip-us.apache.org/repos/asf/spark/blob/362ba4b6/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 606d15d..227e940 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -197,27 +197,12 @@ private[ui] class BlockDataSource(
    * Return Ordering according to sortColumn and desc
    */
   private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = {
-    val ordering = sortColumn match {
-      case "Block Name" => new Ordering[BlockTableRowData] {
-        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
-          Ordering.String.compare(x.blockName, y.blockName)
-      }
-      case "Storage Level" => new Ordering[BlockTableRowData] {
-        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
-          Ordering.String.compare(x.storageLevel, y.storageLevel)
-      }
-      case "Size in Memory" => new Ordering[BlockTableRowData] {
-        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
-          Ordering.Long.compare(x.memoryUsed, y.memoryUsed)
-      }
-      case "Size on Disk" => new Ordering[BlockTableRowData] {
-        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
-          Ordering.Long.compare(x.diskUsed, y.diskUsed)
-      }
-      case "Executors" => new Ordering[BlockTableRowData] {
-        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
-          Ordering.String.compare(x.executors, y.executors)
-      }
+    val ordering: Ordering[BlockTableRowData] = sortColumn match {
+      case "Block Name" => Ordering.by(_.blockName)
+      case "Storage Level" => Ordering.by(_.storageLevel)
+      case "Size in Memory" => Ordering.by(_.memoryUsed)
+      case "Size on Disk" => Ordering.by(_.diskUsed)
+      case "Executors" => Ordering.by(_.executors)
       case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
     }
     if (desc) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org