You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/02/15 20:03:53 UTC
spark git commit: [SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stag…
Repository: spark
Updated Branches:
refs/heads/branch-2.3 0bd7765cd -> 75bb19a01
[SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stag…
…e page
## What changes were proposed in this pull request?
Fixing exception got at sorting tasks by Host / Executor ID:
```
java.lang.IllegalArgumentException: Invalid sort column: Host
at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017)
at org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694)
at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61)
at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96)
at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708)
at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293)
at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
```
Moreover some refactoring to avoid similar problems by introducing constants for each header name and reusing them at the identification of the corresponding sorting index.
## How was this patch tested?
Manually:
![screen shot 2018-02-13 at 18 57 10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png)
(cherry picked from commit 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea)
Author: “attilapiros” <pi...@gmail.com>
Closes #20623 from squito/fix_backport.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75bb19a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75bb19a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75bb19a0
Branch: refs/heads/branch-2.3
Commit: 75bb19a018f9260eab3ea0ba3ea46e84b87eabf2
Parents: 0bd7765
Author: “attilapiros” <pi...@gmail.com>
Authored: Thu Feb 15 14:03:41 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Feb 15 14:03:41 2018 -0600
----------------------------------------------------------------------
.../org/apache/spark/status/storeTypes.scala | 2 +
.../org/apache/spark/ui/jobs/StagePage.scala | 121 ++++++++++++-------
.../org/apache/spark/ui/StagePageSuite.scala | 63 +++++++++-
3 files changed, 139 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/75bb19a0/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 412644d..646cf25 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -109,6 +109,7 @@ private[spark] object TaskIndexNames {
final val DURATION = "dur"
final val ERROR = "err"
final val EXECUTOR = "exe"
+ final val HOST = "hst"
final val EXEC_CPU_TIME = "ect"
final val EXEC_RUN_TIME = "ert"
final val GC_TIME = "gc"
@@ -165,6 +166,7 @@ private[spark] class TaskDataWrapper(
val duration: Long,
@KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
val executorId: String,
+ @KVIndexParam(value = TaskIndexNames.HOST, parent = TaskIndexNames.STAGE)
val host: String,
@KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
val status: String,
http://git-wip-us.apache.org/repos/asf/spark/blob/75bb19a0/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8af2537..a4710f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -742,37 +742,39 @@ private[ui] class TaskPagedTable(
}
def headers: Seq[Node] = {
+ import ApiHelper._
+
val taskHeadersAndCssClasses: Seq[(String, String)] =
Seq(
- ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
- ("Executor ID", ""), ("Host", ""), ("Launch Time", ""), ("Duration", ""),
- ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
- ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
- ("GC Time", ""),
- ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
- ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
- ("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
- {if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
- {if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
- {if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
+ (HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""),
+ (HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""),
+ (HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY),
+ (HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
+ (HEADER_GC_TIME, ""),
+ (HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
+ (HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME),
+ (HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
+ {if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++
+ {if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
+ {if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
{if (hasShuffleRead(stage)) {
- Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read Size / Records", ""),
- ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
+ Seq((HEADER_SHUFFLE_READ_TIME, TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
+ (HEADER_SHUFFLE_TOTAL_READS, ""),
+ (HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
{if (hasShuffleWrite(stage)) {
- Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+ Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
} else {
Nil
}} ++
{if (hasBytesSpilled(stage)) {
- Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
+ Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
} else {
Nil
}} ++
- Seq(("Errors", ""))
+ Seq((HEADER_ERROR, ""))
if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
@@ -953,35 +955,62 @@ private[ui] class TaskPagedTable(
}
}
-private object ApiHelper {
-
-
- private val COLUMN_TO_INDEX = Map(
- "ID" -> null.asInstanceOf[String],
- "Index" -> TaskIndexNames.TASK_INDEX,
- "Attempt" -> TaskIndexNames.ATTEMPT,
- "Status" -> TaskIndexNames.STATUS,
- "Locality Level" -> TaskIndexNames.LOCALITY,
- "Executor ID / Host" -> TaskIndexNames.EXECUTOR,
- "Launch Time" -> TaskIndexNames.LAUNCH_TIME,
- "Duration" -> TaskIndexNames.DURATION,
- "Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
- "Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
- "GC Time" -> TaskIndexNames.GC_TIME,
- "Result Serialization Time" -> TaskIndexNames.SER_TIME,
- "Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
- "Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
- "Accumulators" -> TaskIndexNames.ACCUMULATORS,
- "Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
- "Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
- "Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
- "Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
- "Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
- "Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
- "Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
- "Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
- "Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
- "Errors" -> TaskIndexNames.ERROR)
+private[ui] object ApiHelper {
+
+ val HEADER_ID = "ID"
+ val HEADER_TASK_INDEX = "Index"
+ val HEADER_ATTEMPT = "Attempt"
+ val HEADER_STATUS = "Status"
+ val HEADER_LOCALITY = "Locality Level"
+ val HEADER_EXECUTOR = "Executor ID"
+ val HEADER_HOST = "Host"
+ val HEADER_LAUNCH_TIME = "Launch Time"
+ val HEADER_DURATION = "Duration"
+ val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
+ val HEADER_DESER_TIME = "Task Deserialization Time"
+ val HEADER_GC_TIME = "GC Time"
+ val HEADER_SER_TIME = "Result Serialization Time"
+ val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
+ val HEADER_PEAK_MEM = "Peak Execution Memory"
+ val HEADER_ACCUMULATORS = "Accumulators"
+ val HEADER_INPUT_SIZE = "Input Size / Records"
+ val HEADER_OUTPUT_SIZE = "Output Size / Records"
+ val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
+ val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
+ val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
+ val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
+ val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
+ val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
+ val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
+ val HEADER_ERROR = "Errors"
+
+ private[ui] val COLUMN_TO_INDEX = Map(
+ HEADER_ID -> null.asInstanceOf[String],
+ HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
+ HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
+ HEADER_STATUS -> TaskIndexNames.STATUS,
+ HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
+ HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
+ HEADER_HOST -> TaskIndexNames.HOST,
+ HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
+ HEADER_DURATION -> TaskIndexNames.DURATION,
+ HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
+ HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
+ HEADER_GC_TIME -> TaskIndexNames.GC_TIME,
+ HEADER_SER_TIME -> TaskIndexNames.SER_TIME,
+ HEADER_GETTING_RESULT_TIME -> TaskIndexNames.GETTING_RESULT_TIME,
+ HEADER_PEAK_MEM -> TaskIndexNames.PEAK_MEM,
+ HEADER_ACCUMULATORS -> TaskIndexNames.ACCUMULATORS,
+ HEADER_INPUT_SIZE -> TaskIndexNames.INPUT_SIZE,
+ HEADER_OUTPUT_SIZE -> TaskIndexNames.OUTPUT_SIZE,
+ HEADER_SHUFFLE_READ_TIME -> TaskIndexNames.SHUFFLE_READ_TIME,
+ HEADER_SHUFFLE_TOTAL_READS -> TaskIndexNames.SHUFFLE_TOTAL_READS,
+ HEADER_SHUFFLE_REMOTE_READS -> TaskIndexNames.SHUFFLE_REMOTE_READS,
+ HEADER_SHUFFLE_WRITE_TIME -> TaskIndexNames.SHUFFLE_WRITE_TIME,
+ HEADER_SHUFFLE_WRITE_SIZE -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
+ HEADER_MEM_SPILL -> TaskIndexNames.MEM_SPILL,
+ HEADER_DISK_SPILL -> TaskIndexNames.DISK_SPILL,
+ HEADER_ERROR -> TaskIndexNames.ERROR)
def hasAccumulators(stageData: StageData): Boolean = {
stageData.accumulatorUpdates.exists { acc => acc.name != null && acc.value != null }
http://git-wip-us.apache.org/repos/asf/spark/blob/75bb19a0/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 0aeddf7..6044563 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -28,13 +28,74 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
import org.apache.spark.status.config._
-import org.apache.spark.ui.jobs.{StagePage, StagesTab}
+import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
private val peakExecutionMemory = 10
+ test("ApiHelper.COLUMN_TO_INDEX should match headers of the task table") {
+ val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+ val statusStore = AppStatusStore.createLiveStore(conf)
+ try {
+ val stageData = new StageData(
+ status = StageStatus.ACTIVE,
+ stageId = 1,
+ attemptId = 1,
+ numTasks = 1,
+ numActiveTasks = 1,
+ numCompleteTasks = 1,
+ numFailedTasks = 1,
+ numKilledTasks = 1,
+ numCompletedIndices = 1,
+
+ executorRunTime = 1L,
+ executorCpuTime = 1L,
+ submissionTime = None,
+ firstTaskLaunchedTime = None,
+ completionTime = None,
+ failureReason = None,
+
+ inputBytes = 1L,
+ inputRecords = 1L,
+ outputBytes = 1L,
+ outputRecords = 1L,
+ shuffleReadBytes = 1L,
+ shuffleReadRecords = 1L,
+ shuffleWriteBytes = 1L,
+ shuffleWriteRecords = 1L,
+ memoryBytesSpilled = 1L,
+ diskBytesSpilled = 1L,
+
+ name = "stage1",
+ description = Some("description"),
+ details = "detail",
+ schedulingPool = "pool1",
+
+ rddIds = Seq(1),
+ accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
+ tasks = None,
+ executorSummary = None,
+ killedTasksSummary = Map.empty
+ )
+ val taskTable = new TaskPagedTable(
+ stageData,
+ basePath = "/a/b/c",
+ currentTime = 0,
+ pageSize = 10,
+ sortColumn = "Index",
+ desc = false,
+ store = statusStore
+ )
+ val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
+ assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
+ } finally {
+ statusStore.close()
+ }
+ }
+
test("peak execution memory should displayed") {
val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
val targetString = "peak execution memory"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org