You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/02/15 19:51:33 UTC

spark git commit: [SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stage page

Repository: spark
Updated Branches:
  refs/heads/master db45daab9 -> 1dc2c1d5e


[SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stage page

## What changes were proposed in this pull request?

Fixing exception got at sorting tasks by Host / Executor ID:
```
        java.lang.IllegalArgumentException: Invalid sort column: Host
	at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017)
	at org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694)
	at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61)
	at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96)
	at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708)
	at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293)
	at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282)
	at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
	at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
	at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
	at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
	at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
```

Moreover some refactoring to avoid similar problems by introducing constants for each header name and reusing them at the identification of the corresponding sorting index.

## How was this patch tested?

Manually:

![screen shot 2018-02-13 at 18 57 10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png)

Author: “attilapiros” <pi...@gmail.com>

Closes #20601 from attilapiros/SPARK-23413.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc2c1d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc2c1d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc2c1d5

Branch: refs/heads/master
Commit: 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea
Parents: db45daa
Author: “attilapiros” <pi...@gmail.com>
Authored: Thu Feb 15 13:51:24 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Feb 15 13:51:24 2018 -0600

----------------------------------------------------------------------
 .../org/apache/spark/status/storeTypes.scala    |   2 +
 .../org/apache/spark/ui/jobs/StagePage.scala    | 121 ++++++++++++-------
 .../org/apache/spark/ui/StagePageSuite.scala    |  63 +++++++++-
 3 files changed, 139 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1dc2c1d5/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 412644d..646cf25 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -109,6 +109,7 @@ private[spark] object TaskIndexNames {
   final val DURATION = "dur"
   final val ERROR = "err"
   final val EXECUTOR = "exe"
+  final val HOST = "hst"
   final val EXEC_CPU_TIME = "ect"
   final val EXEC_RUN_TIME = "ert"
   final val GC_TIME = "gc"
@@ -165,6 +166,7 @@ private[spark] class TaskDataWrapper(
     val duration: Long,
     @KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
     val executorId: String,
+    @KVIndexParam(value = TaskIndexNames.HOST, parent = TaskIndexNames.STAGE)
     val host: String,
     @KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
     val status: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc2c1d5/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 5c2b0c3..a9265d4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -750,37 +750,39 @@ private[ui] class TaskPagedTable(
   }
 
   def headers: Seq[Node] = {
+    import ApiHelper._
+
     val taskHeadersAndCssClasses: Seq[(String, String)] =
       Seq(
-        ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
-        ("Executor ID", ""), ("Host", ""), ("Launch Time", ""), ("Duration", ""),
-        ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
-        ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
-        ("GC Time", ""),
-        ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
-        ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
-        ("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
-        {if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
-        {if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
-        {if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
+        (HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""),
+        (HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""),
+        (HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY),
+        (HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
+        (HEADER_GC_TIME, ""),
+        (HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
+        (HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME),
+        (HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
+        {if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++
+        {if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
+        {if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
         {if (hasShuffleRead(stage)) {
-          Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
-            ("Shuffle Read Size / Records", ""),
-            ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
+          Seq((HEADER_SHUFFLE_READ_TIME, TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
+            (HEADER_SHUFFLE_TOTAL_READS, ""),
+            (HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
         } else {
           Nil
         }} ++
         {if (hasShuffleWrite(stage)) {
-          Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+          Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
         } else {
           Nil
         }} ++
         {if (hasBytesSpilled(stage)) {
-          Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
+          Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
         } else {
           Nil
         }} ++
-        Seq(("Errors", ""))
+        Seq((HEADER_ERROR, ""))
 
     if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
       throw new IllegalArgumentException(s"Unknown column: $sortColumn")
@@ -961,35 +963,62 @@ private[ui] class TaskPagedTable(
   }
 }
 
-private object ApiHelper {
-
-
-  private val COLUMN_TO_INDEX = Map(
-    "ID" -> null.asInstanceOf[String],
-    "Index" -> TaskIndexNames.TASK_INDEX,
-    "Attempt" -> TaskIndexNames.ATTEMPT,
-    "Status" -> TaskIndexNames.STATUS,
-    "Locality Level" -> TaskIndexNames.LOCALITY,
-    "Executor ID / Host" -> TaskIndexNames.EXECUTOR,
-    "Launch Time" -> TaskIndexNames.LAUNCH_TIME,
-    "Duration" -> TaskIndexNames.DURATION,
-    "Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
-    "Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
-    "GC Time" -> TaskIndexNames.GC_TIME,
-    "Result Serialization Time" -> TaskIndexNames.SER_TIME,
-    "Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
-    "Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
-    "Accumulators" -> TaskIndexNames.ACCUMULATORS,
-    "Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
-    "Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
-    "Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
-    "Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
-    "Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
-    "Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
-    "Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
-    "Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
-    "Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
-    "Errors" -> TaskIndexNames.ERROR)
+private[ui] object ApiHelper {
+
+  val HEADER_ID = "ID"
+  val HEADER_TASK_INDEX = "Index"
+  val HEADER_ATTEMPT = "Attempt"
+  val HEADER_STATUS = "Status"
+  val HEADER_LOCALITY = "Locality Level"
+  val HEADER_EXECUTOR = "Executor ID"
+  val HEADER_HOST = "Host"
+  val HEADER_LAUNCH_TIME = "Launch Time"
+  val HEADER_DURATION = "Duration"
+  val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
+  val HEADER_DESER_TIME = "Task Deserialization Time"
+  val HEADER_GC_TIME = "GC Time"
+  val HEADER_SER_TIME = "Result Serialization Time"
+  val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
+  val HEADER_PEAK_MEM = "Peak Execution Memory"
+  val HEADER_ACCUMULATORS = "Accumulators"
+  val HEADER_INPUT_SIZE = "Input Size / Records"
+  val HEADER_OUTPUT_SIZE = "Output Size / Records"
+  val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
+  val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
+  val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
+  val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
+  val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
+  val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
+  val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
+  val HEADER_ERROR = "Errors"
+
+  private[ui] val COLUMN_TO_INDEX = Map(
+    HEADER_ID -> null.asInstanceOf[String],
+    HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
+    HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
+    HEADER_STATUS -> TaskIndexNames.STATUS,
+    HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
+    HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
+    HEADER_HOST -> TaskIndexNames.HOST,
+    HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
+    HEADER_DURATION -> TaskIndexNames.DURATION,
+    HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
+    HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
+    HEADER_GC_TIME -> TaskIndexNames.GC_TIME,
+    HEADER_SER_TIME -> TaskIndexNames.SER_TIME,
+    HEADER_GETTING_RESULT_TIME -> TaskIndexNames.GETTING_RESULT_TIME,
+    HEADER_PEAK_MEM -> TaskIndexNames.PEAK_MEM,
+    HEADER_ACCUMULATORS -> TaskIndexNames.ACCUMULATORS,
+    HEADER_INPUT_SIZE -> TaskIndexNames.INPUT_SIZE,
+    HEADER_OUTPUT_SIZE -> TaskIndexNames.OUTPUT_SIZE,
+    HEADER_SHUFFLE_READ_TIME -> TaskIndexNames.SHUFFLE_READ_TIME,
+    HEADER_SHUFFLE_TOTAL_READS -> TaskIndexNames.SHUFFLE_TOTAL_READS,
+    HEADER_SHUFFLE_REMOTE_READS -> TaskIndexNames.SHUFFLE_REMOTE_READS,
+    HEADER_SHUFFLE_WRITE_TIME -> TaskIndexNames.SHUFFLE_WRITE_TIME,
+    HEADER_SHUFFLE_WRITE_SIZE -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
+    HEADER_MEM_SPILL -> TaskIndexNames.MEM_SPILL,
+    HEADER_DISK_SPILL -> TaskIndexNames.DISK_SPILL,
+    HEADER_ERROR -> TaskIndexNames.ERROR)
 
   def hasAccumulators(stageData: StageData): Boolean = {
     stageData.accumulatorUpdates.exists { acc => acc.name != null && acc.value != null }

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc2c1d5/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 0aeddf7..6044563 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -28,13 +28,74 @@ import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
 import org.apache.spark.status.config._
-import org.apache.spark.ui.jobs.{StagePage, StagesTab}
+import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}
 
 class StagePageSuite extends SparkFunSuite with LocalSparkContext {
 
   private val peakExecutionMemory = 10
 
+  test("ApiHelper.COLUMN_TO_INDEX should match headers of the task table") {
+    val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+    val statusStore = AppStatusStore.createLiveStore(conf)
+    try {
+      val stageData = new StageData(
+        status = StageStatus.ACTIVE,
+        stageId = 1,
+        attemptId = 1,
+        numTasks = 1,
+        numActiveTasks = 1,
+        numCompleteTasks = 1,
+        numFailedTasks = 1,
+        numKilledTasks = 1,
+        numCompletedIndices = 1,
+
+        executorRunTime = 1L,
+        executorCpuTime = 1L,
+        submissionTime = None,
+        firstTaskLaunchedTime = None,
+        completionTime = None,
+        failureReason = None,
+
+        inputBytes = 1L,
+        inputRecords = 1L,
+        outputBytes = 1L,
+        outputRecords = 1L,
+        shuffleReadBytes = 1L,
+        shuffleReadRecords = 1L,
+        shuffleWriteBytes = 1L,
+        shuffleWriteRecords = 1L,
+        memoryBytesSpilled = 1L,
+        diskBytesSpilled = 1L,
+
+        name = "stage1",
+        description = Some("description"),
+        details = "detail",
+        schedulingPool = "pool1",
+
+        rddIds = Seq(1),
+        accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
+        tasks = None,
+        executorSummary = None,
+        killedTasksSummary = Map.empty
+      )
+      val taskTable = new TaskPagedTable(
+        stageData,
+        basePath = "/a/b/c",
+        currentTime = 0,
+        pageSize = 10,
+        sortColumn = "Index",
+        desc = false,
+        store = statusStore
+      )
+      val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
+      assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
+    } finally {
+      statusStore.close()
+    }
+  }
+
   test("peak execution memory should displayed") {
     val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
     val targetString = "peak execution memory"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org