You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/09/23 07:44:14 UTC

spark git commit: [SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for all streaming jobs

Repository: spark
Updated Branches:
  refs/heads/master 558e9c7e6 -> 5548a2547


[SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for all streaming jobs

Here is the screenshot after adding the job descriptions to threads that run receivers and the scheduler thread running the batch jobs.

## All jobs page
* Added job descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9924165/cda4a372-5cb1-11e5-91ca-d43a32c699e9.png)

## All stages page
* Added stage descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9923814/2cce266a-5cae-11e5-8a3f-dad84d06c50e.png)

## Streaming batch details page
* Added the +details link
![image](https://cloud.githubusercontent.com/assets/663212/9921977/24014a32-5c98-11e5-958e-457b6c38065b.png)

Author: Tathagata Das <ta...@gmail.com>

Closes #8791 from tdas/SPARK-10652.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5548a254
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5548a254
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5548a254

Branch: refs/heads/master
Commit: 5548a254755bb84edae2768b94ab1816e1b49b91
Parents: 558e9c7
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Sep 22 22:44:09 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Sep 22 22:44:09 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/UIUtils.scala     | 62 +++++++++++++++++-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 14 +++--
 .../org/apache/spark/ui/jobs/StageTable.scala   |  7 +--
 .../org/apache/spark/ui/UIUtilsSuite.scala      | 66 ++++++++++++++++++++
 .../spark/streaming/StreamingContext.scala      |  4 +-
 .../streaming/scheduler/JobScheduler.scala      | 15 ++++-
 .../streaming/scheduler/ReceiverTracker.scala   |  5 +-
 .../apache/spark/streaming/ui/BatchPage.scala   | 33 ++++++----
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 9 files changed, 179 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index f2da417..21dc8f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.ui
 
 import java.text.SimpleDateFormat
-import java.util.{Locale, Date}
+import java.util.{Date, Locale}
 
-import scala.xml.{Node, Text, Unparsed}
+import scala.util.control.NonFatal
+import scala.xml._
+import scala.xml.transform.{RewriteRule, RuleTransformer}
 
 import org.apache.spark.Logging
 import org.apache.spark.ui.scope.RDDOperationGraph
@@ -395,4 +397,60 @@ private[spark] object UIUtils extends Logging {
     </script>
   }
 
+  /**
+   * Returns HTML rendering of a job or stage description. It will try to parse the string as HTML
+   * and make sure that it only contains anchors with root-relative links. Otherwise,
+   * the whole string will rendered as a simple escaped text.
+   *
+   * Note: In terms of security, only anchor tags with root relative links are supported. So any
+   * attempts to embed links outside Spark UI, or other tags like <script> will cause in the whole
+   * description to be treated as plain text.
+   */
+  def makeDescription(desc: String, basePathUri: String): NodeSeq = {
+    import scala.language.postfixOps
+
+    // If the description can be parsed as HTML and has only relative links, then render
+    // as HTML, otherwise render as escaped string
+    try {
+      // Try to load the description as unescaped HTML
+      val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")
+
+      // Verify that this has only anchors and span (we are wrapping in span)
+      val allowedNodeLabels = Set("a", "span")
+      val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
+        allowedNodeLabels.contains(node.label)
+      }
+      if (illegalNodes.nonEmpty) {
+        throw new IllegalArgumentException(
+          "Only HTML anchors allowed in job descriptions\n" +
+            illegalNodes.map { n => s"${n.label} in $n"}.mkString("\n\t"))
+      }
+
+      // Verify that all links are relative links starting with "/"
+      val allLinks =
+        xml \\ "a" flatMap { _.attributes } filter { _.key == "href" } map { _.value.toString }
+      if (allLinks.exists { ! _.startsWith ("/") }) {
+        throw new IllegalArgumentException(
+          "Links in job descriptions must be root-relative:\n" + allLinks.mkString("\n\t"))
+      }
+
+      // Prepend the relative links with basePathUri
+      val rule = new RewriteRule() {
+        override def transform(n: Node): Seq[Node] = {
+          n match {
+            case e: Elem if e \ "@href" nonEmpty =>
+              val relativePath = e.attribute("href").get.toString
+              val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
+              e % Attribute(null, "href", fullUri, Null)
+            case _ => n
+          }
+        }
+      }
+      new RuleTransformer(rule).transform(xml)
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Invalid job description: $desc ", e)
+        <span class="description-input">{desc}</span>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index e72547d..041cd55 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.ui.jobs
 
-import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.xml.{Node, NodeSeq, Unparsed, Utility}
-
 import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.xml._
+
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
 
 /** Page showing list of all ongoing and recently finished jobs */
 private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
@@ -224,6 +224,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
       }
       val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
       val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+      val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath)
+
       val detailUrl =
         "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
       <tr id={"job-" + job.jobId}>
@@ -231,7 +233,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
           {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
         </td>
         <td>
-          <span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
+          {jobDescription}
           <a href={detailUrl} class="name-link">{lastStageName}</a>
         </td>
         <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 99812db..ea806d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.ui.jobs
 
-import scala.xml.Node
-import scala.xml.Text
-
 import java.util.Date
 
+import scala.xml.{Node, Text}
+
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.scheduler.StageInfo
@@ -116,7 +115,7 @@ private[ui] class StageTableBase(
       stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
       desc <- stageData.description
     } yield {
-      <span class="description-input" title={desc}>{desc}</span>
+      UIUtils.makeDescription(desc, basePathUri)
     }
     <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000..2b693c1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.xml.Elem
+
+import org.apache.spark.SparkFunSuite
+
+class UIUtilsSuite extends SparkFunSuite {
+  import UIUtils._
+
+  test("makeDescription") {
+    verify(
+      """test <a href="/link"> text </a>""",
+      <span class="description-input">test <a href="/link"> text </a></span>,
+      "Correctly formatted text with only anchors and relative links should generate HTML"
+    )
+
+    verify(
+      """test <a href="/link" text </a>""",
+      <span class="description-input">{"""test <a href="/link" text </a>"""}</span>,
+      "Badly formatted text should make the description be treated as a streaming instead of HTML"
+    )
+
+    verify(
+      """test <a href="link"> text </a>""",
+      <span class="description-input">{"""test <a href="link"> text </a>"""}</span>,
+      "Non-relative links should make the description be treated as a string instead of HTML"
+    )
+
+    verify(
+      """test<a><img></img></a>""",
+      <span class="description-input">{"""test<a><img></img></a>"""}</span>,
+      "Non-anchor elements should make the description be treated as a string instead of HTML"
+    )
+
+    verify(
+      """test <a href="/link"> text </a>""",
+      <span class="description-input">test <a href="base/link"> text </a></span>,
+      baseUrl = "base",
+      errorMsg = "Base URL should be prepended to html links"
+    )
+  }
+
+  private def verify(
+      desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = ""): Unit = {
+    val generated = makeDescription(desc, baseUrl)
+    assert(generated.sameElements(expected),
+      s"\n$errorMsg\n\nExpected:\n$expected\nGenerated:\n$generated")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6720ba4..94fea63 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -200,6 +200,8 @@ class StreamingContext private[streaming] (
 
   private val startSite = new AtomicReference[CallSite](null)
 
+  private[streaming] def getStartSite(): CallSite = startSite.get()
+
   private var shutdownHookRef: AnyRef = _
 
   conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
@@ -744,7 +746,7 @@ object StreamingContext extends Logging {
         throw new IllegalStateException(
           "Only one StreamingContext may be started in this JVM. " +
             "Currently running StreamingContext was started at" +
-            activeContext.get.startSite.get.longForm)
+            activeContext.get.getStartSite().longForm)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 0cd3959..32d995d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -25,6 +25,7 @@ import scala.util.{Failure, Success}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
+import org.apache.spark.streaming.ui.UIUtils
 import org.apache.spark.util.{EventLoop, ThreadUtils}
 
 
@@ -190,10 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   }
 
   private class JobHandler(job: Job) extends Runnable with Logging {
+    import JobScheduler._
+
     def run() {
-      ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
-      ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
       try {
+        val formattedTime = UIUtils.formatBatchTime(
+          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
+        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
+
+        ssc.sc.setJobDescription(
+          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
+        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+
         // We need to assign `eventLoop` to a temp variable. Otherwise, because
         // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
         // it's possible that when `post` is called, `eventLoop` happens to null.

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f86fd44..204e614 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc._
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{ThreadUtils, SerializableConfiguration}
+import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
 
 
 /** Enumeration to identify current state of a Receiver */
@@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
         }
       receiverRDD.setName(s"Receiver $receiverId")
+      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
+      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
+
       val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
         receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
       // We will keep restarting the receiver job until ReceiverTracker is stopped

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 90d1b0f..9129c1f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{NodeSeq, Node, Text, Unparsed}
+import scala.xml._
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.streaming.Time
-import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
-import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId}
 import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
 
 private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
 
@@ -207,16 +207,25 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
             sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
           }
         }
-    val lastStageData = lastStageInfo.flatMap { s =>
-      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
-    }
-
-    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
-    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+    lastStageInfo match {
+      case Some(stageInfo) =>
+        val details = if (stageInfo.details.nonEmpty) {
+          <span
+            onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+            class="expand-details">
+              +details
+          </span> ++
+          <div class="stage-details collapsed">
+            <pre>{stageInfo.details}</pre>
+          </div>
+        } else {
+          NodeSeq.Empty
+        }
 
-    <span class="description-input" title={lastStageDescription}>
-      {lastStageDescription}
-    </span> ++ Text(lastStageName)
+        <div> {stageInfo.name} {details} </div>
+      case None =>
+        Text("(Unknown)")
+    }
   }
 
   private def failureReasonCell(failureReason: String): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5548a254/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3b9d0d1..c7a8771 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -204,7 +204,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
 
     // Verify streaming jobs have expected thread-local properties
     assert(jobGroupFound === null)
-    assert(jobDescFound === null)
+    assert(jobDescFound.contains("Streaming job from"))
     assert(jobInterruptFound === "false")
 
     // Verify current thread's thread-local properties have not changed


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org