You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/01/22 18:36:35 UTC

spark git commit: [SPARK-23121][CORE] Fix for ui becoming unaccessible for long running streaming apps

Repository: spark
Updated Branches:
  refs/heads/master 4327ccf28 -> 446948af1


[SPARK-23121][CORE] Fix for ui becoming unaccessible for long running streaming apps

## What changes were proposed in this pull request?

The allJobs and the job pages attempt to use stage attempt and DAG visualization from the store, but for long running jobs they are not guaranteed to be retained, leading to exceptions when these pages are rendered.

To fix it `store.lastStageAttempt(stageId)` and `store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default values are used if the info is missing.

## How was this patch tested?

Manual testing of the UI, also using the test command reported in SPARK-23121:

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount ./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark

Closes #20287

Author: Sandor Murakozi <sm...@gmail.com>

Closes #20330 from smurakozi/SPARK-23121.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/446948af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/446948af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/446948af

Branch: refs/heads/master
Commit: 446948af1d8dbc080a26a6eec6f743d338f1d12b
Parents: 4327ccf
Author: Sandor Murakozi <sm...@gmail.com>
Authored: Mon Jan 22 10:36:28 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Jan 22 10:36:28 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 24 +++++++++++---------
 .../org/apache/spark/ui/jobs/JobPage.scala      | 10 ++++++--
 .../org/apache/spark/ui/jobs/StagePage.scala    |  9 +++++---
 3 files changed, 27 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/446948af/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index e3b72f1..2b0f4ac 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -36,6 +36,9 @@ import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
 private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") {
+
+  import ApiHelper._
+
   private val JOBS_LEGEND =
     <div class="legend-area"><svg width="150px" height="85px">
       <rect class="succeeded-job-legend"
@@ -65,10 +68,9 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
     }.map { job =>
       val jobId = job.jobId
       val status = job.status
-      val jobDescription = store.lastStageAttempt(job.stageIds.max).description
-      val displayJobDescription = jobDescription
-        .map(UIUtils.makeDescription(_, "", plainText = true).text)
-        .getOrElse("")
+      val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
+      val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text
+
       val submissionTime = job.submissionTime.get.getTime()
       val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
       val classNameByStatus = status match {
@@ -80,7 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
 
       // The timeline library treats contents as HTML, so we have to escape them. We need to add
       // extra layers of escaping in order to embed this in a Javascript string literal.
-      val escapedDesc = Utility.escape(displayJobDescription)
+      val escapedDesc = Utility.escape(jobDescription)
       val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
       val jobEventJsonAsStr =
         s"""
@@ -430,6 +432,8 @@ private[ui] class JobDataSource(
     sortColumn: String,
     desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
 
+  import ApiHelper._
+
   // Convert JobUIData to JobTableRowData which contains the final contents to show in the table
   // so that we can avoid creating duplicate contents during sorting the data
   private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
@@ -454,23 +458,21 @@ private[ui] class JobDataSource(
     val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
     val submissionTime = jobData.submissionTime
     val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-    val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
-    val lastStageDescription = lastStageAttempt.description.getOrElse("")
+    val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData)
 
-    val formattedJobDescription =
-      UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
+    val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
 
     val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
 
     new JobTableRowData(
       jobData,
-      lastStageAttempt.name,
+      lastStageName,
       lastStageDescription,
       duration.getOrElse(-1),
       formattedDuration,
       submissionTime.map(_.getTime()).getOrElse(-1L),
       formattedSubmissionTime,
-      formattedJobDescription,
+      jobDescription,
       detailUrl
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/446948af/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index c27f30c..46f2a76 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -336,8 +336,14 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
     content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
       store.executorList(false), appStartTime)
 
-    content ++= UIUtils.showDagVizForJob(
-      jobId, store.operationGraphForJob(jobId))
+    val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match {
+      case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, operationGraph)
+      case None =>
+        <div id="no-info">
+          <p>No DAG visualization information to display for job {jobId}</p>
+        </div>
+    }
+    content ++= operationGraphContent
 
     if (shouldShowActiveStages) {
       content ++=

http://git-wip-us.apache.org/repos/asf/spark/blob/446948af/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0eb3190..5c2b0c3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,10 @@ import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{HashMap, HashSet}
-import scala.xml.{Elem, Node, Unparsed}
+import scala.xml.{Node, Unparsed}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskLocality
 import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
@@ -1020,4 +1018,9 @@ private object ApiHelper {
     }
   }
 
+  def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = {
+    val stage = store.asOption(store.lastStageAttempt(job.stageIds.max))
+    (stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name))
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org