You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/04/04 22:44:07 UTC
spark git commit: [SPARK-23838][WEBUI] Running SQL query is displayed
as "completed" in SQL tab
Repository: spark
Updated Branches:
refs/heads/master cccaaa14a -> d8379e5bc
[SPARK-23838][WEBUI] Running SQL query is displayed as "completed" in SQL tab
## What changes were proposed in this pull request?
A running SQL query would appear as completed in the Spark UI:
![image1](https://user-images.githubusercontent.com/1097932/38170733-3d7cb00c-35bf-11e8-994c-43f2d4fa285d.png)
We can see the query in "Completed queries", while in in the job page we see it's still running Job 132.
![image2](https://user-images.githubusercontent.com/1097932/38170735-48f2c714-35bf-11e8-8a41-6fae23543c46.png)
After some time in the query still appears in "Completed queries" (while it's still running), but the "Duration" gets increased.
![image3](https://user-images.githubusercontent.com/1097932/38170737-50f87ea4-35bf-11e8-8b60-000f6f918964.png)
To reproduce, we can run a query with multiple jobs. E.g. Run TPCDS q6.
The reason is that updates from executions are written into kvstore periodically, and the job start event may be missed.
## How was this patch tested?
Manually run the job again and check the SQL Tab. The fix is pretty simple.
Author: Gengliang Wang <ge...@databricks.com>
Closes #20955 from gengliangwang/jobCompleted.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8379e5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8379e5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8379e5b
Branch: refs/heads/master
Commit: d8379e5bc3629f4e8233ad42831bdaf68c24cfeb
Parents: cccaaa1
Author: Gengliang Wang <ge...@databricks.com>
Authored: Wed Apr 4 15:43:58 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Apr 4 15:43:58 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/execution/ui/AllExecutionsPage.scala | 3 ++-
.../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 6 ++++--
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d8379e5b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index e751ce3..5825287 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -39,7 +39,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
sqlStore.executionsList().foreach { e =>
- val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
+ val isRunning = e.completionTime.isEmpty ||
+ e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isRunning) {
running += e
http://git-wip-us.apache.org/repos/asf/spark/blob/d8379e5b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 71e9f93..2b6bb48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -88,7 +88,7 @@ class SQLAppStatusListener(
exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
exec.stages ++= event.stageIds.toSet
- update(exec)
+ update(exec, force = true)
}
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
@@ -308,11 +308,13 @@ class SQLAppStatusListener(
})
}
- private def update(exec: LiveExecutionData): Unit = {
+ private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
val now = System.nanoTime()
if (exec.endEvents >= exec.jobs.size + 1) {
exec.write(kvstore, now)
liveExecutions.remove(exec.executionId)
+ } else if (force) {
+ exec.write(kvstore, now)
} else if (liveUpdatePeriodNs >= 0) {
if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
exec.write(kvstore, now)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org