You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/21 10:08:56 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2201] Show ExecutionId when running status on query engine page

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb93275b [KYUUBI #2201] Show ExecutionId when running status on query engine page
4fb93275b is described below

commit 4fb93275b111cbbf06eb454806b358e9ad8c447b
Author: sychen <sy...@trip.com>
AuthorDate: Thu Apr 21 18:08:42 2022 +0800

    [KYUUBI #2201] Show ExecutionId when running status on query engine page
    
    ### _Why are the changes needed?_
    
    Now we can display the ExecutionId on the query engine page, but generally the sql has been finished.
    We can update it when SQL is running and there is an ExecutionId.
    
    close https://github.com/apache/incubator-kyuubi/issues/2201, https://github.com/apache/incubator-kyuubi/issues/921
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2202 from cxzl25/KYUUBI-2201.
    
    Closes #2201
    
    ed4f8b08 [sychen] compile time
    607a18c4 [sychen] import
    9096b6a8 [sychen] compiledTime
    abb0cc0f [sychen] add sync
    e1273e5b [sychen] use running state to judge
    3d413fe4 [sychen] fix style
    f619fe4b [sychen] use PARSING
    a6d91e24 [sychen] use SparkListenerSQLExecutionStart to get executionId
    4a036260 [sychen] set COMPILED state early
    60de793b [sychen] fix ut
    e2b879f0 [sychen] update ExecutionId
    
    Lead-authored-by: sychen <sy...@trip.com>
    Co-authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../engine/spark/operation/ExecuteStatement.scala  | 27 +++++++++++++++++++---
 .../apache/spark/kyuubi/SQLOperationListener.scala |  6 +++++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 39308f288..630de7706 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -79,9 +79,6 @@ class ExecuteStatement(
       // TODO: Make it configurable
       spark.sparkContext.addSparkListener(operationListener)
       result = spark.sql(statement)
-      // TODO #921: COMPILED need consider eagerly executed commands
-      setState(OperationState.COMPILED)
-      debug(result.queryExecution)
       iter =
         if (incrementalCollect) {
           info("Execute in incremental collect mode")
@@ -97,6 +94,7 @@ class ExecuteStatement(
             new ArrayFetchIterator(result.take(resultMaxRows))
           }
         }
+      setCompiledStateIfNeeded()
       setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
@@ -158,4 +156,27 @@ class ExecuteStatement(
     EventBus.post(
       SparkOperationEvent(this, operationListener.getExecutionId))
   }
+
+  def setCompiledStateIfNeeded(): Unit = synchronized {
+    if (getStatus.state == OperationState.RUNNING) {
+      val lastAccessCompiledTime =
+        if (result != null) {
+          val phase = result.queryExecution.tracker.phases
+          if (phase.contains("parsing") && phase.contains("planning")) {
+            val compiledTime = phase("planning").endTimeMs - phase("parsing").startTimeMs
+            lastAccessTime + compiledTime
+          } else {
+            0L
+          }
+        } else {
+          0L
+        }
+      super.setState(OperationState.COMPILED)
+      if (lastAccessCompiledTime > 0L) {
+        lastAccessTime = lastAccessCompiledTime
+      }
+      EventBus.post(
+        SparkOperationEvent(this, operationListener.getExecutionId))
+    }
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 42b49ea2d..0b3141770 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -29,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
+import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
 import org.apache.kyuubi.operation.Operation
 import org.apache.kyuubi.operation.log.OperationLog
 
@@ -86,6 +87,11 @@ class SQLOperationListener(
       if (executionId.isEmpty) {
         executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
           .map(_.toLong)
+        operation match {
+          case executeStatement: ExecuteStatement =>
+            executeStatement.setCompiledStateIfNeeded()
+          case _ =>
+        }
       }
       withOperationLog {
         activeJobs.add(jobId)