You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/07 01:51:07 UTC

spark git commit: [SPARK-10885] [STREAMING] Display the failed output op in Streaming UI

Repository: spark
Updated Branches:
  refs/heads/master 5e035403d -> ffe6831e4


[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI

This PR implements the following features for both `master` and `branch-1.5`.
1. Display the failed output op count in the batch list
2. Display the failure reason of output op in the batch detail page

Screenshots:
<img width="1356" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png">
<img width="1356" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png">

There are still two remaining problems in the UI.
1. If an output operation doesn't run any spark job, we cannot get the its duration since now it's the sum of all jobs' durations.
2. If an output operation doesn't run any spark job, we cannot get the description since it's the latest job's call site.

We need to add new `StreamingListenerEvent` about output operations to fix them. So I'd like to fix them only for `master` in another PR.

Author: zsxwing <zs...@gmail.com>

Closes #8950 from zsxwing/batch-failure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffe6831e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffe6831e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffe6831e

Branch: refs/heads/master
Commit: ffe6831e49e28eb855f857fdfa5dd99341e80c9d
Parents: 5e03540
Author: zsxwing <zs...@gmail.com>
Authored: Tue Oct 6 16:51:03 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Oct 6 16:51:03 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/scheduler/BatchInfo.scala   |  10 ++
 .../spark/streaming/scheduler/JobSet.scala      |   1 +
 .../spark/streaming/ui/AllBatchesTable.scala    |  15 ++-
 .../apache/spark/streaming/ui/BatchPage.scala   | 134 ++++++++++++++++---
 .../apache/spark/streaming/ui/BatchUIData.scala |   6 +-
 .../spark/streaming/UISeleniumSuite.scala       |   4 +-
 6 files changed, 143 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 3c86956..463f899 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,6 +41,8 @@ case class BatchInfo(
 
   private var _failureReasons: Map[Int, String] = Map.empty
 
+  private var _numOutputOp: Int = 0
+
   @deprecated("Use streamIdToInputInfo instead", "1.5.0")
   def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
 
@@ -77,4 +79,12 @@ case class BatchInfo(
 
   /** Failure reasons corresponding to every output ops in the batch */
   private[streaming] def failureReasons = _failureReasons
+
+  /** Set the number of output operations in this batch */
+  private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
+    _numOutputOp = numOutputOp
+  }
+
+  /** Return the number of output operations in this batch */
+  private[streaming] def numOutputOp: Int = _numOutputOp
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 255ccf0..08f63cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -81,6 +81,7 @@ case class JobSet(
       if (processingEndTime >= 0) Some(processingEndTime) else None
     )
     binfo.setFailureReason(failureReasons)
+    binfo.setNumOutputOp(jobs.size)
     binfo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index f702bd5..3e6590d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
 private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
   extends BatchTableBase("completed-batches-table", batchInterval) {
 
-  override protected def columns: Seq[Node] = super.columns ++
-    <th>Total Delay
-      {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+  override protected def columns: Seq[Node] = super.columns ++ {
+    <th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
+      <th>Output Ops: Succeeded/Total</th>
+  }
 
   override protected def renderRows: Seq[Node] = {
     batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
   private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
     val totalDelay = batch.totalDelay
     val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val numFailedOutputOp = batch.failureReason.size
+    val outputOpColumn = if (numFailedOutputOp > 0) {
+        s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
+          s" (${numFailedOutputOp} failed)"
+      } else {
+        s"${batch.numOutputOp}/${batch.numOutputOp}"
+      }
     baseRow(batch) ++
       <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
         {formattedTotalDelay}
       </td>
+      <td>{outputOpColumn}</td>
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 9129c1f..1b717b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     <th>Output Op Id</th>
       <th>Description</th>
       <th>Duration</th>
+      <th>Status</th>
       <th>Job Id</th>
       <th>Duration</th>
       <th class="sorttable_nosort">Stages: Succeeded/Total</th>
@@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: SparkJobIdWithUIData): Seq[Node] = {
     if (sparkJob.jobUIData.isDefined) {
       generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
-        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
     } else {
       generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
-        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+        outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
     }
   }
 
+  private def generateOutputOpRowWithoutSparkJobs(
+    outputOpId: OutputOpId,
+    outputOpDescription: Seq[Node],
+    formattedOutputOpDuration: String,
+    outputOpStatus: String): Seq[Node] = {
+    <tr>
+      <td class="output-op-id-cell" >{outputOpId.toString}</td>
+      <td>{outputOpDescription}</td>
+      <td>{formattedOutputOpDuration}</td>
+      {outputOpStatusCell(outputOpStatus, rowspan = 1)}
+      <!-- Job Id -->
+      <td>-</td>
+      <!-- Duration -->
+      <td>-</td>
+      <!-- Stages: Succeeded/Total -->
+      <td>-</td>
+      <!-- Tasks (for all stages): Succeeded/Total -->
+      <td>-</td>
+      <!-- Error -->
+      <td>-</td>
+    </tr>
+  }
+
   /**
    * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
    * one cell, we use "rowspan" for the first row of a output op.
@@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
@@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
           {outputOpDescription}
         </td>
-        <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+        <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+        {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
             total = sparkJob.numTasks - sparkJob.numSkippedTasks)
         }
       </td>
-      {failureReasonCell(lastFailureReason)}
+      {failureReasonCell(lastFailureReason, rowspan = 1)}
     </tr>
   }
 
@@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       outputOpId: OutputOpId,
       outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
+      outputOpStatus: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       jobId: Int): Seq[Node] = {
@@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       if (isFirstRow) {
         <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
           <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
-          <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+          <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
+          {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
       } else {
         Nil
       }
@@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     <tr>
       {prefixCells}
       <td sorttable_customkey={jobId.toString}>
-        {jobId.toString}
+        {if (jobId >= 0) jobId.toString else "-"}
       </td>
       <!-- Duration -->
       <td>-</td>
@@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   }
 
   private def generateOutputOpIdRow(
-      outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+      outputOpId: OutputOpId,
+      outputOpStatus: String,
+      sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
     // We don't count the durations of dropped jobs
     val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
       map(sparkJob => {
@@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
 
     val description = generateOutputOpDescription(sparkJobs)
 
-    generateJobRow(
-      outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
-      sparkJobs.tail.map { sparkJob =>
+    if (sparkJobs.isEmpty) {
+      generateOutputOpRowWithoutSparkJobs(
+        outputOpId, description, formattedOutputOpDuration, outputOpStatus)
+    } else {
+      val firstRow =
         generateJobRow(
-          outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
-      }.flatMap(x => x)
+          outputOpId,
+          description,
+          formattedOutputOpDuration,
+          outputOpStatus,
+          sparkJobs.size,
+          true,
+          sparkJobs.head)
+      val tailRows =
+        sparkJobs.tail.map { sparkJob =>
+          generateJobRow(
+            outputOpId,
+            description,
+            formattedOutputOpDuration,
+            outputOpStatus,
+            sparkJobs.size,
+            false,
+            sparkJob)
+        }
+      (firstRow ++ tailRows).flatten
+    }
   }
 
   private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
@@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     }
   }
 
-  private def failureReasonCell(failureReason: String): Seq[Node] = {
+  private def failureReasonCell(
+      failureReason: String,
+      rowspan: Int,
+      includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
     val isMultiline = failureReason.indexOf('\n') >= 0
     // Display the first line by default
     val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       } else {
         failureReason
       })
+    val failureDetails =
+      if (isMultiline && !includeFirstLineInExpandDetails) {
+        // Skip the first line
+        failureReason.substring(failureReason.indexOf('\n') + 1)
+      } else {
+        failureReason
+      }
     val details = if (isMultiline) {
       // scalastyle:off
       <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
@@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         +details
       </span> ++
         <div class="stacktrace-details collapsed">
-          <pre>{failureReason}</pre>
+          <pre>{failureDetails}</pre>
         </div>
       // scalastyle:on
     } else {
       ""
     }
-    <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+
+    if (rowspan == 1) {
+      <td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
+    } else {
+      <td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
+        {failureReasonSummary}{details}
+      </td>
+    }
   }
 
   private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
    * Generate the job table for the batch.
    */
   private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
-    val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
-      sortBy(_._1). // sorted by OutputOpId
+    val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
       map { case (outputOpId, outputOpIdAndSparkJobIds) =>
         // sort SparkJobIds for each OutputOpId
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
+    val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
+      val status = batchUIData.failureReason.get(outputOpId).map { failure =>
+        if (failure.startsWith("org.apache.spark.SparkException")) {
+          "Failed due to Spark job error\n" + failure
+        } else {
+          var nextLineIndex = failure.indexOf("\n")
+          if (nextLineIndex < 0) {
+            nextLineIndex = failure.size
+          }
+          val firstLine = failure.substring(0, nextLineIndex)
+          s"Failed due to error: $firstLine\n$failure"
+        }
+      }.getOrElse("Succeeded")
+      val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
+      (outputOpId, status, sparkJobIds)
+    }
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
-        outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
-          (outputOpId,
+      val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
+        outputOps.map { case (outputOpId, status, sparkJobIds) =>
+          (outputOpId, status,
             sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
         }
 
@@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         <tbody>
           {
             outputOpIdWithJobs.map {
-              case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
+              case (outputOpId, status, sparkJobIds) =>
+                generateOutputOpIdRow(outputOpId, status, sparkJobIds)
             }
           }
         </tbody>
@@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
       replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
   }
+
+  private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
+    if (status == "Succeeded") {
+      <td rowspan={rowspan.toString}>Succeeded</td>
+    } else {
+      failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index ae508c0..e6c2e21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
     val submissionTime: Long,
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
+    val numOutputOp: Int,
+    val failureReason: Map[Int, String],
     var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
 
   /**
@@ -69,7 +71,9 @@ private[ui] object BatchUIData {
       batchInfo.streamIdToInputInfo,
       batchInfo.submissionTime,
       batchInfo.processingStartTime,
-      batchInfo.processingEndTime
+      batchInfo.processingEndTime,
+      batchInfo.numOutputOp,
+      batchInfo.failureReasons
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6831e/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 068a6cb..d1df788 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -121,7 +121,7 @@ class UISeleniumSuite
         }
         findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
           List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
-            "Total Delay (?)")
+            "Total Delay (?)", "Output Ops: Succeeded/Total")
         }
 
         val batchLinks =
@@ -138,7 +138,7 @@ class UISeleniumSuite
         summaryText should contain ("Total delay:")
 
         findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
-          List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
+          List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
             "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org