You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/12/19 09:42:29 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3975] Support to post batch session/operation event

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 40009722c [KYUUBI #3975] Support to post batch session/operation event
40009722c is described below

commit 40009722cbeb54d1cc1c6e2ac1948bfd6667fb98
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Dec 19 17:42:20 2022 +0800

    [KYUUBI #3975] Support to post batch session/operation event
    
    ### _Why are the changes needed?_
    
    Support to post batch session/operation event.
    
    Besides, make some improvements.
    - wrap `session.open` with `handleSessionException`
    - add sessionType for KyuubiOperationEvent
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3975 from turboFei/batch_session_event.
    
    Closes #3975
    
    d4bc151c5 [fwang12] save
    ba4ee50c6 [fwang12] respect session name
    9c8ff405b [fwang12] do not use session.name
    2231ae752 [fwang12] session name
    6fa8be581 [fwang12] session name
    a69fc25e9 [fwang12] save
    5832238d8 [fwang12] session type
    42723bd2d [fwang12] check opened time
    cfe537f60 [fwang12] do not treat launch engine as operation
    806be6dd9 [fwang12] update session event
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../kyuubi/events/KyuubiOperationEvent.scala       | 10 +++++---
 .../kyuubi/operation/BatchJobSubmission.scala      | 11 +++++++-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 18 ++++++++++---
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  2 +-
 .../ServerJsonLoggingEventHandlerSuite.scala       | 30 +++++++++++++++++++++-
 5 files changed, 62 insertions(+), 9 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index 34c23601f..74a3a3fad 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.events
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
+import org.apache.kyuubi.session.KyuubiSession
 
 /**
  * A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side.
@@ -40,6 +41,7 @@ import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
  * @param exception: caught exception if have
  * @param sessionId the identifier of the parent session
  * @param sessionUser the authenticated client user
+ * @param sessionType the type of the parent session
  */
 case class KyuubiOperationEvent private (
     statementId: String,
@@ -53,7 +55,8 @@ case class KyuubiOperationEvent private (
     completeTime: Long,
     exception: Option[Throwable],
     sessionId: String,
-    sessionUser: String) extends KyuubiEvent {
+    sessionUser: String,
+    sessionType: String) extends KyuubiEvent {
 
   // operation events are partitioned by the date when the corresponding operations are
   // created.
@@ -67,7 +70,7 @@ object KyuubiOperationEvent {
    * Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance
    */
   def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
-    val session = operation.getSession
+    val session = operation.getSession.asInstanceOf[KyuubiSession]
     val status = operation.getStatus
     new KyuubiOperationEvent(
       operation.statementId,
@@ -81,6 +84,7 @@ object KyuubiOperationEvent {
       status.completed,
       status.exception,
       session.handle.identifier.toString,
-      session.user)
+      session.user,
+      session.sessionType.toString)
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 5912e0b00..e0eb7c05d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -29,10 +29,11 @@ import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
+import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
+import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.KyuubiBatchSessionImpl
@@ -60,6 +61,7 @@ class BatchJobSubmission(
     recoveryMetadata: Option[Metadata])
   extends KyuubiApplicationOperation(session) {
   import BatchJobSubmission._
+  EventBus.post(KyuubiOperationEvent(this))
 
   override def shouldRunAsync: Boolean = true
 
@@ -143,6 +145,13 @@ class BatchJobSubmission(
   private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
     if (state != CANCELED) {
       setState(newState)
+      applicationInfo.filter(_.id != null).foreach { ai =>
+        session.getSessionEvent.foreach(_.engineId = ai.id)
+      }
+      if (newState == RUNNING) {
+        session.onEngineOpened()
+      }
+      EventBus.post(KyuubiOperationEvent(this))
     }
   }
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index feccc8ee0..8e5830293 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -77,11 +77,14 @@ class KyuubiBatchSessionImpl(
       sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
   }
 
+  override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
+    normalizedConf.get(KyuubiConf.SESSION_NAME.key))
+
   private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
     .newBatchJobSubmissionOperation(
       this,
       batchRequest.getBatchType,
-      batchRequest.getName,
+      name.orNull,
       batchRequest.getResource,
       batchRequest.getClassName,
       normalizedConf,
@@ -101,6 +104,7 @@ class KyuubiBatchSessionImpl(
   }
 
   private val sessionEvent = KyuubiSessionEvent(this)
+  recoveryMetadata.map(metadata => sessionEvent.engineId = metadata.engineId)
   EventBus.post(sessionEvent)
 
   override def getSessionEvent: Option[KyuubiSessionEvent] = {
@@ -119,7 +123,7 @@ class KyuubiBatchSessionImpl(
     }
   }
 
-  override def open(): Unit = {
+  override def open(): Unit = handleSessionException {
     MetricsSystem.tracing { ms =>
       ms.incCount(CONN_TOTAL)
       ms.incCount(MetricRegistry.name(CONN_OPEN, user))
@@ -136,7 +140,7 @@ class KyuubiBatchSessionImpl(
         state = OperationState.PENDING.toString,
         resource = batchRequest.getResource,
         className = batchRequest.getClassName,
-        requestName = batchRequest.getName,
+        requestName = name.orNull,
         requestConf = normalizedConf,
         requestArgs = batchRequest.getArgs.asScala,
         createTime = createTime,
@@ -152,6 +156,14 @@ class KyuubiBatchSessionImpl(
     super.open()
 
     runOperation(batchJobSubmissionOp)
+    sessionEvent.totalOperations += 1
+  }
+
+  private[kyuubi] def onEngineOpened(): Unit = {
+    if (sessionEvent.openedTime <= 0) {
+      sessionEvent.openedTime = System.currentTimeMillis()
+      EventBus.post(sessionEvent)
+    }
   }
 
   override def close(): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index fb6aa60b8..5f6866bc8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -108,7 +108,7 @@ class KyuubiSessionImpl(
 
   private var _engineSessionHandle: SessionHandle = _
 
-  override def open(): Unit = {
+  override def open(): Unit = handleSessionException {
     MetricsSystem.tracing { ms =>
       ms.incCount(CONN_TOTAL)
       ms.incCount(MetricRegistry.name(CONN_OPEN, user))
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 8022b6cf3..7fca01efd 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -35,8 +35,10 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.OperationState._
 import org.apache.kyuubi.server.KyuubiServer
 import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
 
-class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper {
+class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper
+  with BatchTestHelper {
 
   private val engineLogRoot = "file://" + Utils.createTempDir().toString
   private val serverLogRoot = "file://" + Utils.createTempDir().toString
@@ -116,6 +118,7 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
         assert(res.getString("remoteSessionId") == "")
         assert(res.getLong("startTime") > 0)
         assert(res.getInt("totalOperations") == 0)
+        assert(res.getString("sessionType") === SessionType.SQL.toString)
         assert(res.next())
         assert(res.getInt("totalOperations") == 0)
         assert(res.getString("sessionId") == sid)
@@ -123,10 +126,35 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
         assert(res.getLong("openedTime") > 0)
         assert(res.next())
         assert(res.getInt("totalOperations") == 1)
+        assert(res.getString("sessionType") === SessionType.SQL.toString)
         assert(res.getLong("endTime") > 0)
         assert(!res.next())
       }
     }
+
+    val batchRequest = newSparkBatchRequest()
+    val sessionMgr = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+    val batchSessionHandle = sessionMgr.openBatchSession(
+      Utils.currentUser,
+      "kyuubi",
+      "127.0.0.1",
+      Map.empty,
+      batchRequest)
+    withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
+      withJdbcStatement() { statement =>
+        val res = statement.executeQuery(
+          s"SELECT * FROM `json`.`$serverSessionEventPath` " +
+            s"where sessionName = '${sparkBatchTestAppName}' order by totalOperations")
+        assert(res.next())
+        assert(res.getString("user") == Utils.currentUser)
+        assert(res.getString("sessionName") == sparkBatchTestAppName)
+        assert(res.getString("sessionId") === batchSessionHandle.identifier.toString)
+        assert(res.getString("remoteSessionId") == "")
+        assert(res.getLong("startTime") > 0)
+        assert(res.getInt("totalOperations") == 0)
+        assert(res.getString("sessionType") === SessionType.BATCH.toString)
+      }
+    }
   }
 
   test("engine session id is not same with server session id") {