You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/06 22:09:13 UTC

[spark] branch branch-2.4 updated: [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 35381dd  [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore
35381dd is described below

commit 35381dd3dfdba644bbd52a397f321c7302b821c4
Author: Shahid <sh...@gmail.com>
AuthorDate: Wed Mar 6 14:02:30 2019 -0800

    [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore
    
    ## What changes were proposed in this pull request?
    Currently, when the event reordering happens, especially onJobStart event come after onExecutionEnd event, SQL page in the UI displays weirdly.(for eg:test mentioned in JIRA and also this issue randomly occurs when the TPCDS query  fails due to broadcast timeout etc.)
    
    The reason is that, In the SQLAppstatusListener, we remove the liveExecutions entry once the execution ends. So, if a jobStart event come after that, then we create a new liveExecution entry corresponding to the execId. Eventually this will overwrite the kvstore and UI displays confusing entries.
    
    ## How was this patch tested?
    
    Added UT, Also manually tested with the eventLog, provided in the jira, of the failed query.
    
    Before fix:
    ![screenshot from 2019-03-03 03-05-52](https://user-images.githubusercontent.com/23054875/53687929-53e2b800-3d61-11e9-9dca-620fa41e605c.png)
    
    After fix:
    ![screenshot from 2019-03-03 02-40-18](https://user-images.githubusercontent.com/23054875/53687928-4f1e0400-3d61-11e9-86aa-584646ac68f9.png)
    
    Closes #23939 from shahidki31/SPARK-27019.
    
    Authored-by: Shahid <sh...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
    (cherry picked from commit 62fd133f744ab2d1aa3c409165914b5940e4d328)
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../sql/execution/ui/SQLAppStatusListener.scala    | 45 +++++++++++++++++-----
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 30 +++++++++++++++
 2 files changed, 66 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index d254af4..6069da8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.ui
 
-import java.util.Date
+import java.util.{Date, NoSuchElementException}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Function
 
@@ -77,7 +77,29 @@ class SQLAppStatusListener(
 
     val executionId = executionIdString.toLong
     val jobId = event.jobId
-    val exec = getOrCreateExecution(executionId)
+    val exec = Option(liveExecutions.get(executionId))
+      .orElse {
+        try {
+          // Should not overwrite the kvstore with new entry, if it already has the SQLExecution
+          // data corresponding to the execId.
+          val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId)
+          val executionData = new LiveExecutionData(executionId)
+          executionData.description = sqlStoreData.description
+          executionData.details = sqlStoreData.details
+          executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
+          executionData.metrics = sqlStoreData.metrics
+          executionData.submissionTime = sqlStoreData.submissionTime
+          executionData.completionTime = sqlStoreData.completionTime
+          executionData.jobs = sqlStoreData.jobs
+          executionData.stages = sqlStoreData.stages
+          executionData.metricsValues = sqlStoreData.metricValues
+          executionData.endEvents = sqlStoreData.jobs.size + 1
+          liveExecutions.put(executionId, executionData)
+          Some(executionData)
+        } catch {
+          case _: NoSuchElementException => None
+        }
+      }.getOrElse(getOrCreateExecution(executionId))
 
     // Record the accumulator IDs for the stages of this job, so that the code that keeps
     // track of the metrics knows which accumulators to look at.
@@ -276,16 +298,20 @@ class SQLAppStatusListener(
       exec.endEvents += 1
       update(exec)
 
-      // Remove stale LiveStageMetrics objects for stages that are not active anymore.
-      val activeStages = liveExecutions.values().asScala.flatMap { other =>
-        if (other != exec) other.stages else Nil
-      }.toSet
-      stageMetrics.keySet().asScala
-        .filter(!activeStages.contains(_))
-        .foreach(stageMetrics.remove)
+      removeStaleMetricsData(exec)
     }
   }
 
+  private def removeStaleMetricsData(exec: LiveExecutionData): Unit = {
+    // Remove stale LiveStageMetrics objects for stages that are not active anymore.
+    val activeStages = liveExecutions.values().asScala.flatMap { other =>
+      if (other != exec) other.stages else Nil
+    }.toSet
+    stageMetrics.keySet().asScala
+      .filter(!activeStages.contains(_))
+      .foreach(stageMetrics.remove)
+  }
+
   private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
     val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
     Option(liveExecutions.get(executionId)).foreach { exec =>
@@ -312,6 +338,7 @@ class SQLAppStatusListener(
     val now = System.nanoTime()
     if (exec.endEvents >= exec.jobs.size + 1) {
       exec.write(kvstore, now)
+      removeStaleMetricsData(exec)
       liveExecutions.remove(exec.executionId)
     } else if (force) {
       exec.write(kvstore, now)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 02df45d..c5f3fe5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -384,6 +384,36 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
     assertJobs(statusStore.execution(executionId), failed = Seq(0))
   }
 
+  test("onJobStart happens after onExecutionEnd shouldn't overwrite kvstore") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Seq(createStageInfo(0, 0)),
+      createProperties(executionId)))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      JobFailed(new RuntimeException("Oops"))))
+
+    assert(listener.noLiveData())
+    assert(statusStore.execution(executionId).get.completionTime.nonEmpty)
+  }
+
   test("handle one execution with multiple jobs") {
     val statusStore = createStatusStore()
     val listener = statusStore.listener.get


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org