You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/26 23:03:32 UTC

git commit: [SPARK-2286][UI] Report exception/errors for failed tasks that are not ExceptionFailure

Repository: spark
Updated Branches:
  refs/heads/master 32a1ad753 -> 6587ef7c1


[SPARK-2286][UI] Report exception/errors for failed tasks that are not ExceptionFailure

Also added inline doc for each TaskEndReason.

Author: Reynold Xin <rx...@apache.org>

Closes #1225 from rxin/SPARK-2286 and squashes the following commits:

6a7959d [Reynold Xin] Fix unit test failure.
cf9d5eb [Reynold Xin] Merge branch 'master' into SPARK-2286
a61fae1 [Reynold Xin] Move to line above ...
38c7391 [Reynold Xin] [SPARK-2286][UI] Report exception/errors for failed tasks that are not ExceptionFailure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6587ef7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6587ef7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6587ef7c

Branch: refs/heads/master
Commit: 6587ef7c1783961e6ef250afa387271a1bd6e277
Parents: 32a1ad7
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jun 26 14:00:45 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jun 26 14:03:22 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  | 68 ++++++++++++++++++--
 .../apache/spark/scheduler/TaskSetManager.scala |  4 +-
 .../spark/ui/jobs/JobProgressListener.scala     | 17 ++---
 .../org/apache/spark/ui/jobs/StagePage.scala    | 12 +---
 .../org/apache/spark/ui/jobs/StageTable.scala   |  4 +-
 5 files changed, 77 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6587ef7c/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index a307491..5e8bd8c 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -30,27 +30,69 @@ import org.apache.spark.storage.BlockManagerId
 @DeveloperApi
 sealed trait TaskEndReason
 
+/**
+ * :: DeveloperApi ::
+ * Task succeeded.
+ */
 @DeveloperApi
 case object Success extends TaskEndReason
 
+/**
+ * :: DeveloperApi ::
+ * Various possible reasons why a task failed.
+ */
+@DeveloperApi
+sealed trait TaskFailedReason extends TaskEndReason {
+  /** Error message displayed in the web UI. */
+  def toErrorString: String
+}
+
+/**
+ * :: DeveloperApi ::
+ * A [[org.apache.spark.scheduler.ShuffleMapTask]] that completed successfully earlier, but we
+ * lost the executor before the stage completed. This means Spark needs to reschedule the task
+ * to be re-executed on a different executor.
+ */
 @DeveloperApi
-case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
+case object Resubmitted extends TaskFailedReason {
+  override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)"
+}
 
+/**
+ * :: DeveloperApi ::
+ * Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote
+ * executors the task is trying to fetch from, and thus need to rerun the previous stage.
+ */
 @DeveloperApi
 case class FetchFailed(
     bmAddress: BlockManagerId,
     shuffleId: Int,
     mapId: Int,
     reduceId: Int)
-  extends TaskEndReason
+  extends TaskFailedReason {
+  override def toErrorString: String = {
+    val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
+    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
+  }
+}
 
+/**
+ * :: DeveloperApi ::
+ * Task failed due to a runtime exception. This is the most common failure case and also captures
+ * user program exceptions.
+ */
 @DeveloperApi
 case class ExceptionFailure(
     className: String,
     description: String,
     stackTrace: Array[StackTraceElement],
     metrics: Option[TaskMetrics])
-  extends TaskEndReason
+  extends TaskFailedReason {
+  override def toErrorString: String = {
+    val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
+    s"$className ($description}\n$stackTraceString"
+  }
+}
 
 /**
  * :: DeveloperApi ::
@@ -58,10 +100,18 @@ case class ExceptionFailure(
  * it was fetched.
  */
 @DeveloperApi
-case object TaskResultLost extends TaskEndReason
+case object TaskResultLost extends TaskFailedReason {
+  override def toErrorString: String = "TaskResultLost (result lost from block manager)"
+}
 
+/**
+ * :: DeveloperApi ::
+ * Task was killed intentionally and needs to be rescheduled.
+ */
 @DeveloperApi
-case object TaskKilled extends TaskEndReason
+case object TaskKilled extends TaskFailedReason {
+  override def toErrorString: String = "TaskKilled (killed intentionally)"
+}
 
 /**
  * :: DeveloperApi ::
@@ -69,7 +119,9 @@ case object TaskKilled extends TaskEndReason
  * the task crashed the JVM.
  */
 @DeveloperApi
-case object ExecutorLostFailure extends TaskEndReason
+case object ExecutorLostFailure extends TaskFailedReason {
+  override def toErrorString: String = "ExecutorLostFailure (executor lost)"
+}
 
 /**
  * :: DeveloperApi ::
@@ -77,4 +129,6 @@ case object ExecutorLostFailure extends TaskEndReason
  * deserializing the task result.
  */
 @DeveloperApi
-case object UnknownReason extends TaskEndReason
+case object UnknownReason extends TaskFailedReason {
+  override def toErrorString: String = "UnknownReason"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6587ef7c/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b5bcdd7..c0898f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -641,7 +641,9 @@ private[spark] class TaskSetManager(
       addPendingTask(index, readding=true)
     }
 
-    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
+    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage.
+    // The reason is the next stage wouldn't be able to fetch the data from this dead executor
+    // so we would need to rerun these tasks on other executors.
     if (tasks(0).isInstanceOf[ShuffleMapTask]) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index

http://git-wip-us.apache.org/repos/asf/spark/blob/6587ef7c/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index bfefe4d..381a544 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.jobs
 
 import scala.collection.mutable.{HashMap, ListBuffer}
 
-import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
+import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
@@ -51,6 +51,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   var totalShuffleRead = 0L
   var totalShuffleWrite = 0L
 
+  // TODO: Should probably consolidate all following into a single hash map.
   val stageIdToTime = HashMap[Int, Long]()
   val stageIdToShuffleRead = HashMap[Int, Long]()
   val stageIdToShuffleWrite = HashMap[Int, Long]()
@@ -183,17 +184,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
       tasksActive.remove(info.taskId)
 
-      val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+      val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
         taskEnd.reason match {
           case org.apache.spark.Success =>
             stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
             (None, Option(taskEnd.taskMetrics))
-          case e: ExceptionFailure =>
+          case e: ExceptionFailure =>  // Handle ExceptionFailure because we might have metrics
             stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
-            (Some(e), e.metrics)
-          case e: org.apache.spark.TaskEndReason =>
+            (Some(e.toErrorString), e.metrics)
+          case e: TaskFailedReason =>  // All other failure cases
             stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
-            (None, None)
+            (Some(e.toErrorString), None)
         }
 
       stageIdToTime.getOrElseUpdate(sid, 0L)
@@ -221,7 +222,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
 
       val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
-      taskMap(info.taskId) = new TaskUIData(info, metrics, failureInfo)
+      taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
       stageIdToTaskData(sid) = taskMap
     }
   }
@@ -256,7 +257,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 case class TaskUIData(
     taskInfo: TaskInfo,
     taskMetrics: Option[TaskMetrics] = None,
-    exception: Option[ExceptionFailure] = None)
+    errorMessage: Option[String] = None)
 
 private object JobProgressListener {
   val DEFAULT_POOL_NAME = "default"

http://git-wip-us.apache.org/repos/asf/spark/blob/6587ef7c/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 4bce472..8b65f06 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -210,10 +210,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
 
   def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
       (taskData: TaskUIData): Seq[Node] = {
-    def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
-      trace.map(e => <span style="display:block;">{e.toString}</span>)
-
-    taskData match { case TaskUIData(info, metrics, exception) =>
+    taskData match { case TaskUIData(info, metrics, errorMessage) =>
       val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
         else metrics.map(_.executorRunTime).getOrElse(1L)
       val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
@@ -283,12 +280,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           </td>
         }}
         <td>
-          {exception.map { e =>
-            <span>
-              {e.className} ({e.description})<br/>
-              {fmtStackTrace(e.stackTrace)}
-            </span>
-          }.getOrElse("")}
+          {errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
         </td>
       </tr>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6587ef7c/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a3f824a..30971f7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -91,13 +91,13 @@ private[ui] class StageTableBase(
         {s.name}
       </a>
 
-    val details = if (s.details.nonEmpty) (
+    val details = if (s.details.nonEmpty) {
       <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
             class="expand-details">
         +show details
       </span>
       <pre class="stage-details collapsed">{s.details}</pre>
-    )
+    }
 
     listener.stageIdToDescription.get(s.stageId)
       .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)