You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/21 10:08:56 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2201] Show ExecutionId when running status on query engine page
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb93275b [KYUUBI #2201] Show ExecutionId when running status on query engine page
4fb93275b is described below
commit 4fb93275b111cbbf06eb454806b358e9ad8c447b
Author: sychen <sy...@trip.com>
AuthorDate: Thu Apr 21 18:08:42 2022 +0800
[KYUUBI #2201] Show ExecutionId when running status on query engine page
### _Why are the changes needed?_
Now we can display the ExecutionId on the query engine page, but generally the sql has been finished.
We can update it when SQL is running and there is an ExecutionId.
close https://github.com/apache/incubator-kyuubi/issues/2201, https://github.com/apache/incubator-kyuubi/issues/921
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2202 from cxzl25/KYUUBI-2201.
Closes #2201
ed4f8b08 [sychen] compile time
607a18c4 [sychen] import
9096b6a8 [sychen] compiledTime
abb0cc0f [sychen] add sync
e1273e5b [sychen] use running state to judge
3d413fe4 [sychen] fix style
f619fe4b [sychen] use PARSING
a6d91e24 [sychen] use SparkListenerSQLExecutionStart to get executionId
4a036260 [sychen] set COMPILED state early
60de793b [sychen] fix ut
e2b879f0 [sychen] update ExecutionId
Lead-authored-by: sychen <sy...@trip.com>
Co-authored-by: sychen <sy...@ctrip.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../engine/spark/operation/ExecuteStatement.scala | 27 +++++++++++++++++++---
.../apache/spark/kyuubi/SQLOperationListener.scala | 6 +++++
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 39308f288..630de7706 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -79,9 +79,6 @@ class ExecuteStatement(
// TODO: Make it configurable
spark.sparkContext.addSparkListener(operationListener)
result = spark.sql(statement)
- // TODO #921: COMPILED need consider eagerly executed commands
- setState(OperationState.COMPILED)
- debug(result.queryExecution)
iter =
if (incrementalCollect) {
info("Execute in incremental collect mode")
@@ -97,6 +94,7 @@ class ExecuteStatement(
new ArrayFetchIterator(result.take(resultMaxRows))
}
}
+ setCompiledStateIfNeeded()
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
@@ -158,4 +156,27 @@ class ExecuteStatement(
EventBus.post(
SparkOperationEvent(this, operationListener.getExecutionId))
}
+
+ def setCompiledStateIfNeeded(): Unit = synchronized {
+ if (getStatus.state == OperationState.RUNNING) {
+ val lastAccessCompiledTime =
+ if (result != null) {
+ val phase = result.queryExecution.tracker.phases
+ if (phase.contains("parsing") && phase.contains("planning")) {
+ val compiledTime = phase("planning").endTimeMs - phase("parsing").startTimeMs
+ lastAccessTime + compiledTime
+ } else {
+ 0L
+ }
+ } else {
+ 0L
+ }
+ super.setState(OperationState.COMPILED)
+ if (lastAccessCompiledTime > 0L) {
+ lastAccessTime = lastAccessCompiledTime
+ }
+ EventBus.post(
+ SparkOperationEvent(this, operationListener.getExecutionId))
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 42b49ea2d..0b3141770 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -29,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
+import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog
@@ -86,6 +87,11 @@ class SQLOperationListener(
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
.map(_.toLong)
+ operation match {
+ case executeStatement: ExecuteStatement =>
+ executeStatement.setCompiledStateIfNeeded()
+ case _ =>
+ }
}
withOperationLog {
activeJobs.add(jobId)