You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/13 17:11:55 UTC

[GitHub] asfgit closed pull request #23068: [SPARK-26098][WebUI] Show associated SQL query in Job page

asfgit closed pull request #23068: [SPARK-26098][WebUI] Show associated SQL query in Job page
URL: https://github.com/apache/spark/pull/23068
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index bd3f58b6182c0..262ff6547faa5 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -70,6 +70,8 @@ private[spark] class AppStatusListener(
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
+
+  private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to do synchronization
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
@@ -318,6 +320,8 @@ private[spark] class AppStatusListener(
     val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
     val jobGroup = Option(event.properties)
       .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
+    val sqlExecutionId = Option(event.properties)
+      .flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong))
 
     val job = new LiveJob(
       event.jobId,
@@ -325,7 +329,8 @@ private[spark] class AppStatusListener(
       if (event.time > 0) Some(new Date(event.time)) else None,
       event.stageIds,
       jobGroup,
-      numTasks)
+      numTasks,
+      sqlExecutionId)
     liveJobs.put(event.jobId, job)
     liveUpdate(job, now)
 
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index b35781cb36e81..312bcccb1cca1 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -56,6 +56,13 @@ private[spark] class AppStatusStore(
     store.read(classOf[JobDataWrapper], jobId).info
   }
 
+  // Returns job data and associated SQL execution ID of certain Job ID.
+  // If there is no related SQL execution, the SQL execution ID part will be None.
+  def jobWithAssociatedSql(jobId: Int): (v1.JobData, Option[Long]) = {
+    val data = store.read(classOf[JobDataWrapper], jobId)
+    (data.info, data.sqlExecutionId)
+  }
+
   def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
     val base = store.view(classOf[ExecutorSummaryWrapper])
     val filtered = if (activeOnly) {
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 47e45a66ecccb..7f7b83a54d794 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -64,7 +64,8 @@ private class LiveJob(
     val submissionTime: Option[Date],
     val stageIds: Seq[Int],
     jobGroup: Option[String],
-    numTasks: Int) extends LiveEntity {
+    numTasks: Int,
+    sqlExecutionId: Option[Long]) extends LiveEntity {
 
   var activeTasks = 0
   var completedTasks = 0
@@ -108,7 +109,7 @@ private class LiveJob(
       skippedStages.size,
       failedStages,
       killedSummary)
-    new JobDataWrapper(info, skippedStages)
+    new JobDataWrapper(info, skippedStages, sqlExecutionId)
   }
 
 }
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index ef19e86f3135f..eea47b3b17098 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -68,7 +68,8 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
  */
 private[spark] class JobDataWrapper(
     val info: JobData,
-    val skippedStages: Set[Int]) {
+    val skippedStages: Set[Int],
+    val sqlExecutionId: Option[Long]) {
 
   @JsonIgnore @KVIndex
   private def id: Int = info.jobId
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 55444a2c0c9ab..b58a6ca447edf 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -189,7 +189,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
     require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
 
     val jobId = parameterId.toInt
-    val jobData = store.asOption(store.job(jobId)).getOrElse {
+    val (jobData, sqlExecutionId) = store.asOption(store.jobWithAssociatedSql(jobId)).getOrElse {
       val content =
         <div id="no-info">
           <p>No information to display for job {jobId}</p>
@@ -197,6 +197,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
       return UIUtils.headerSparkPage(
         request, s"Details for Job $jobId", content, parent)
     }
+
     val isComplete = jobData.status != JobExecutionStatus.RUNNING
     val stages = jobData.stageIds.map { stageId =>
       // This could be empty if the listener hasn't received information about the
@@ -278,6 +279,17 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
             <Strong>Status:</Strong>
             {jobData.status}
           </li>
+          {
+            if (sqlExecutionId.isDefined) {
+              <li>
+                <strong>Associated SQL Query: </strong>
+                {<a href={"%s/SQL/execution/?id=%s".format(
+                  UIUtils.prependBaseUri(request, parent.basePath),
+                  sqlExecutionId.get)
+                }>{sqlExecutionId.get}</a>}
+              </li>
+            }
+          }
           {
             if (jobData.jobGroup.isDefined) {
               <li>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org