You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/07/19 10:15:31 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #814] Event Tracking: For jonInfo

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e77f9  [KYUUBI #814] Event Tracking: For jonInfo
e7e77f9 is described below

commit e7e77f9277ad5537b202676e7c116d3a806f81ca
Author: 张宇翔 <zh...@126.com>
AuthorDate: Mon Jul 19 18:15:15 2021 +0800

    [KYUUBI #814] Event Tracking: For jonInfo
    
    ### _Why are the changes needed?_
    
    Store all jobInfos for each statement in mem.
    - First, according to jobInfo, you can get how long did it run, and get which stage took the longest time by stageIds.
    - Second, if this job failed that you can get from jobResult field, you can look up which stage cause this situation by stageIds.
    
    Some interfaces:
    - KyuubiJobInfo: job's summary info, contains statementId, startTime, endTime, jobResult, stageIds.
    - KyuubiStatementListener: singleton pattern, used for getting metrics about job, stage, executor and so on.
    - KyuubiStatementMonitor: for storing data in mem and dumpping them to a file when reached threshold.
    
    Test case:
    externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
    
    Closes #814 from zhang1002/branch-1.2_add-jobinfo.
    
    Closes #814
    
    7d8ece23 [张宇翔] 1. Change the log level from INFO to DEBUG 2. Code modify: avoid producing NullPointerException
    0ddd83a7 [张宇翔] 1. Remove waitForOperationToComplete(client, opHandle)
    bbc36bb3 [张宇翔] 1. Merge statementListener to sqlEngineListener
    741fcd0f [张宇翔] Add interval
    b61b9406 [张宇翔] Code changes
    cb6f92c4 [张宇翔] 1. Remove object-lock 2. Add eventually code
    c1a1d732 [张宇翔] 1. Remove volatile for jobInfo
    9547d49e [张宇翔] 1. Remove volatile for jobInfo
    970c0002 [张宇翔] 1. Add volatile for jobInfo
    62ecb53d [张宇翔] 1. Remove kyuubiJobInfoQueue 2. Modify some unit test 3. Add some code annotations
    db1de381 [张宇翔] 1. Add some log 2. Add some annotations
    8deb61f2 [张宇翔] change some test
    87d9c102 [张宇翔] 1. Event tracking: for jonInfo 2. Add unit test
    86c49ebc [张宇翔] Merge master branch
    a0a99b3a [张宇翔] Merge master branch
    f248bef7 [张宇翔] Merge remote-tracking branch 'upstream/master'
    5d3b9afb [张宇翔] Code optimization
    db3a0b6f [张宇翔] Format changes
    08d6d1fc [张宇翔] Modify some annotations
    f62f00e9 [张宇翔] Event Tracking: For job
    85025193 [张宇翔] Merge branch 'branch-1.2_spark-monitor' into branch-1.2_add-jobinfo
    3c8d9af1 [张宇翔] 1. change directory structure 2. code optimizing
    b4290bfa [张宇翔] Event tracking: For jobInfo
    fe1f7cce [张宇翔] Change directory structure
    5ffb54f3 [张宇翔] Add kyuubi-spark-monitor module for nightly.yml
    71d33b9a [张宇翔] Add some comment
    6b87dff0 [张宇翔] 1. Remove some unused code 2. Add some comment
    f364d433 [张宇翔] Format change: Add newline in KyuubiStatementInfo
    8bddb202 [张宇翔] 1. Remove the relationship between executionId and operationId 2. Get each state by the function: setState 3. Get this statement's physicalPlan in ExecuteStatement 4. Add sparkUser item 5. Remove java code
    f43b3c8f [张宇翔] merge master branch
    55522613 [张宇翔] Merge branch 'master' into branch-1.2_spark-monitor
    6f0be547 [张宇翔] format change
    bbfba274 [张宇翔] Even tracking
    5a62f586 [张宇翔] remove some unused conf
    b26345ac [张宇翔] Merge master branch and resolve conflict
    fdc12bbe [张宇翔] Event tracking: for kStatement 1. Store the relationship between executionId and operationId 2. Store the relationship between operationId and statement 3. Store the relationship between executionId and physicalPlan 4. Store each state and its happen time for this statement: initialized, running, finished
    d1676268 [张宇翔] event tracking: for SQLExecutionStart
    
    Authored-by: 张宇翔 <zh...@126.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../spark/monitor/KyuubiStatementMonitor.scala     | 82 ++++++++++++++++++++--
 .../spark/monitor/entity/KyuubiJobInfo.scala       | 45 ++++++++++++
 .../engine/spark/operation/ExecuteStatement.scala  |  8 +--
 .../apache/spark/kyuubi/SQLOperationListener.scala |  2 +-
 .../spark/kyuubi/SparkSQLEngineListener.scala      | 18 ++++-
 .../engine/spark/KyuubiStatementMonitorSuite.scala | 50 +++++++++++--
 .../scala/org/apache/kyuubi/KyuubiSparkUtils.scala | 24 +++++++
 7 files changed, 209 insertions(+), 20 deletions(-)

diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
index f33fbe9..178cca4 100644
--- a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala
@@ -17,14 +17,20 @@
 
 package org.apache.kyuubi.engine.spark.monitor
 
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
+
+import org.apache.spark.scheduler.SparkListenerJobEnd
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
+import org.apache.kyuubi.engine.spark.monitor.entity.{KyuubiJobInfo, KyuubiStatementInfo}
 
 // TODO: Thread Safe need to consider
 object KyuubiStatementMonitor extends Logging{
 
+  // TODO: Just for test. We will remove them in the future
+  private val maxCapacity: Int = 10
+  private val maxSize: Int = 7
+
   /**
    * This blockingQueue store kyuubiStatementInfo.
    *
@@ -36,17 +42,34 @@ object KyuubiStatementMonitor extends Logging{
    *      b. this queue's current size
    */
   // TODO: Capacity should make configurable
-  private val kyuubiStatementQueue = new ArrayBlockingQueue[KyuubiStatementInfo](10)
+  private val kyuubiStatementQueue = new ArrayBlockingQueue[KyuubiStatementInfo](maxCapacity)
+
+  /**
+   * This map store the relationship between jobId and jobInfo.
+   * When the job has finished, all we can get is jobId from the object-jobEnd.
+   * So we need to maintain a mapping relationship to store endTime and jobResult
+   * when this job has finished.
+   *
+   * Key is jobId, value is KyuubiJobInfo.
+   *
+   * Notice:
+   *    1. There have two kinds of threshold to trigger when to remove and dump items from this map:
+   *      a. time
+   *      b. this map's current size
+   */
+  // TODO: Capacity should make configurable
+  private val kyuubiJobIdToJobInfoMap = new ConcurrentHashMap[Int, KyuubiJobInfo](maxCapacity)
 
   /**
-   * This function is used for putting kyuubiStatementInfo into blockingQueue(statementQueue).
+   * This method is used for putting kyuubiStatementInfo into blockingQueue(statementQueue).
    * Every time we put an item into this queue, we should judge this queue's current size at first.
    * If the size is less than threshold, we need to remove items from this queue.
+   *
    * @param kyuubiStatementInfo
    */
   // TODO: Lack size type threshold and time type threshold
   def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = {
-    if (kyuubiStatementQueue.size() >= 7) {
+    if (kyuubiStatementQueue.size() >= maxSize) {
       removeAndDumpStatementInfoFromQueue()
     }
     val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo)
@@ -55,7 +78,7 @@ object KyuubiStatementMonitor extends Logging{
   }
 
   /**
-   * This function is used for removing kyuubiStatementInfo from blockingQueue(statementQueue)
+   * This method is used for removing kyuubiStatementInfo from blockingQueue(statementQueue)
    * and dumpping them to a file by threshold.
    */
   // TODO: Need ensure those items have finished. If not, we should put them into this queue again.
@@ -63,4 +86,51 @@ object KyuubiStatementMonitor extends Logging{
     // TODO: Just for test
     kyuubiStatementQueue.clear()
   }
+
+  /**
+   * This method is used for putting kyuubiJobInfo into hashMap(kyuubiJobIdToJobInfoMap)
+   * and storing the mapping relationship between jobId and jobInfo.
+   * The reason that we need to maintain a mapping relationship
+   * is we need to store endTime and jobResult
+   * when this job has finished but the object-jobEnd has nothing but jobId.
+   *
+   * @param kyuubiJobInfo
+   */
+  // TODO: Lack size type threshold and time type threshold
+  def putJobInfoIntoMap(kyuubiJobInfo: KyuubiJobInfo): Unit = {
+    if (kyuubiJobIdToJobInfoMap.size() >= maxSize) {
+      removeAndDumpJobInfoFromMap()
+    }
+    // Put kyuubiJobInfo into kyuubiJobIdToJobInfoMap
+    kyuubiJobIdToJobInfoMap.put(kyuubiJobInfo.jobId, kyuubiJobInfo)
+  }
+
+  /**
+   * This method is used for removing kyuubiJobInfo from hashMap(kyuubiJobIdToJobInfoMap)
+   * and dumpping them to a file by threshold.
+   */
+  private def removeAndDumpJobInfoFromMap(): Unit = {
+    // TODO: Just for test
+    kyuubiJobIdToJobInfoMap.clear()
+  }
+
+  /**
+   * This method is used for inserting endTime and jobResult.
+   * Those fields can only get when this job has finished.
+   *
+   * Notice:
+   *    1. endTime and jobResult should consider the thread safe.
+   *
+   * @param jobEnd
+   */
+  def insertJobEndTimeAndResult(jobEnd: SparkListenerJobEnd): Unit = {
+    val jobInfo = kyuubiJobIdToJobInfoMap.get(jobEnd.jobId)
+    if (jobInfo != null) {
+      jobInfo.endTime = jobEnd.time
+      jobInfo.jobResult = jobEnd.jobResult
+      debug(s"Job finished. Query [${jobInfo.statementId}]: JobId is [${jobInfo.jobId}]")
+    } else {
+      warn(s"JobStartEvent is lost. JobId is [${jobInfo.jobId}]")
+    }
+  }
 }
diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiJobInfo.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiJobInfo.scala
new file mode 100644
index 0000000..4cb599d
--- /dev/null
+++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiJobInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.monitor.entity
+
+import org.apache.spark.scheduler.JobResult
+
+/**
+ * This object is used for storing the basic data for job.
+ * You can use statementId to get all jobs that belong to this statemnent.
+ * And also you can use statementId and jobId to get all stages that belong to this job.
+ *
+ * Introduce:
+ *    1. According to startTime and endTime, you can get how long did it run,
+ *       and get which stage took the longest time by stageIds.
+ *    2. If this job failed, you can look up which stage cause this situation by stageIds.
+ *
+ * @param jobId
+ * @param statementId
+ * @param stageIds: is array
+ * @param startTime
+ */
+case class KyuubiJobInfo(
+    jobId: Int,
+    statementId: String,
+    stageIds: Seq[Int],
+    startTime: Long) {
+
+  var endTime: Long = 0
+  var jobResult: JobResult = null
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 76bf7c9..11d3c88 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -44,7 +44,7 @@ class ExecuteStatement(
     queryTimeout: Long)
   extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
 
-  import ExecuteStatement._
+  import org.apache.kyuubi.KyuubiSparkUtils._
 
   private val forceCancel =
     session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
@@ -177,9 +177,3 @@ class ExecuteStatement(
     kyuubiStatementInfo.exception = opEx
   }
 }
-
-object ExecuteStatement {
-  final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
-  final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
-  final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
-}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index dfa91b8..6e289a0 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -23,8 +23,8 @@ import org.apache.spark.scheduler._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
 
+import org.apache.kyuubi.KyuubiSparkUtils._
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.spark.operation.ExecuteStatement._
 import org.apache.kyuubi.operation.Operation
 import org.apache.kyuubi.operation.log.OperationLog
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index 38b0882..099b605 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -24,11 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.tailrec
 
 import org.apache.spark.SparkException
-import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerApplicationEnd, SparkListenerJobEnd}
+import org.apache.spark.scheduler._
 
+import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
+import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
 import org.apache.kyuubi.ha.client.EngineServiceDiscovery
 import org.apache.kyuubi.service.{Serverable, ServiceState}
 
@@ -63,8 +66,19 @@ class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logg
     }
   }
 
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+    val statementId = jobStart.properties.getProperty(KYUUBI_STATEMENT_ID_KEY)
+    val kyuubiJobInfo = KyuubiJobInfo(
+      jobStart.jobId, statementId, jobStart.stageIds, jobStart.time)
+    KyuubiStatementMonitor.putJobInfoIntoMap(kyuubiJobInfo)
+    debug(s"Add jobStartInfo. Query [$statementId]: Job ${jobStart.jobId} started with " +
+      s"${jobStart.stageIds.length} stages")
+  }
+
   override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
-   jobEnd.jobResult match {
+    KyuubiStatementMonitor.insertJobEndTimeAndResult(jobEnd)
+    info(s"Job end. Job ${jobEnd.jobId} state is ${jobEnd.jobResult.toString}")
+    jobEnd.jobResult match {
      case JobFailed(e) if e != null =>
        val cause = findCause(e)
        var deregisterInfo: Option[String] = None
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
index f54ac52..a485184 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
@@ -17,16 +17,18 @@
 
 package org.apache.kyuubi.engine.spark
 
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
 
-import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationHandle}
+import org.apache.hive.service.rpc.thrift._
 import org.apache.hive.service.rpc.thrift.TCLIService.Iface
 import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.apache.spark.scheduler.JobSucceeded
 import org.scalatest.PrivateMethodTester
+import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
-import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
-import org.apache.kyuubi.operation.HiveJDBCTests
+import org.apache.kyuubi.engine.spark.monitor.entity.{KyuubiJobInfo, KyuubiStatementInfo}
+import org.apache.kyuubi.operation.{HiveJDBCTests, OperationHandle}
 
 class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
     with PrivateMethodTester {
@@ -76,6 +78,46 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
     }
   }
 
+  test("add kyuubiJobInfo into queue and remove them when threshold reached") {
+    val sql = "select timestamp'2021-06-01'"
+    val getJobMap = PrivateMethod[
+      ConcurrentHashMap[Int, KyuubiJobInfo]](Symbol("kyuubiJobIdToJobInfoMap"))()
+
+    val jobIdToJobInfoMap = KyuubiStatementMonitor.invokePrivate(getJobMap)
+    jobIdToJobInfoMap.clear()
+    withSessionHandle { (client, handle) =>
+      val req = new TExecuteStatementReq()
+      req.setSessionHandle(handle)
+      req.setStatement(sql)
+      val tExecuteStatementResp = client.ExecuteStatement(req)
+      val opHandle = tExecuteStatementResp.getOperationHandle
+
+      eventually(timeout(10.seconds), interval(100.milliseconds)) {
+        val elements = jobIdToJobInfoMap.elements()
+        while (elements.hasMoreElements) {
+          val kyuubiJobInfo = elements.nextElement()
+          assert(jobIdToJobInfoMap.size() === 1)
+          assert(kyuubiJobInfo.statementId === OperationHandle(opHandle).identifier.toString)
+          assert(kyuubiJobInfo.stageIds.length === 1)
+          assert(kyuubiJobInfo.jobResult === JobSucceeded)
+          assert(kyuubiJobInfo.endTime !== 0)
+        }
+      }
+
+      // Test for clear kyuubiJobIdToJobInfoMap when threshold reached
+      // This function is used for avoiding mem leak
+      (1 to 7).foreach { _ =>
+        val req = new TExecuteStatementReq()
+        req.setSessionHandle(handle)
+        req.setStatement(sql)
+        val tExecuteStatementResp = client.ExecuteStatement(req)
+        val operationHandle = tExecuteStatementResp.getOperationHandle
+        waitForOperationToComplete(client, operationHandle)
+      }
+      assert(jobIdToJobInfoMap.size() === 1)
+    }
+  }
+
   private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
     val req = new TGetOperationStatusReq(op)
     var state = client.GetOperationStatus(req).getOperationState
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala
new file mode 100644
index 0000000..964db85
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+object KyuubiSparkUtils {
+  final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
+  final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
+  final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
+}