You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/12/19 09:42:29 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3975] Support to post batch session/operation event
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 40009722c [KYUUBI #3975] Support to post batch session/operation event
40009722c is described below
commit 40009722cbeb54d1cc1c6e2ac1948bfd6667fb98
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Dec 19 17:42:20 2022 +0800
[KYUUBI #3975] Support to post batch session/operation event
### _Why are the changes needed?_
Support to post batch session/operation event.
Besides, make some improvements.
- wrap `session.open` with `handleSessionException`
- add sessionType for KyuubiOperationEvent
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3975 from turboFei/batch_session_event.
Closes #3975
d4bc151c5 [fwang12] save
ba4ee50c6 [fwang12] respect session name
9c8ff405b [fwang12] do not use session.name
2231ae752 [fwang12] session name
6fa8be581 [fwang12] session name
a69fc25e9 [fwang12] save
5832238d8 [fwang12] session type
42723bd2d [fwang12] check opened time
cfe537f60 [fwang12] do not treat launch engine as operation
806be6dd9 [fwang12] update session event
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../kyuubi/events/KyuubiOperationEvent.scala | 10 +++++---
.../kyuubi/operation/BatchJobSubmission.scala | 11 +++++++-
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 18 ++++++++++---
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 2 +-
.../ServerJsonLoggingEventHandlerSuite.scala | 30 +++++++++++++++++++++-
5 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index 34c23601f..74a3a3fad 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
+import org.apache.kyuubi.session.KyuubiSession
/**
* A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side.
@@ -40,6 +41,7 @@ import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
* @param exception: caught exception if have
* @param sessionId the identifier of the parent session
* @param sessionUser the authenticated client user
+ * @param sessionType the type of the parent session
*/
case class KyuubiOperationEvent private (
statementId: String,
@@ -53,7 +55,8 @@ case class KyuubiOperationEvent private (
completeTime: Long,
exception: Option[Throwable],
sessionId: String,
- sessionUser: String) extends KyuubiEvent {
+ sessionUser: String,
+ sessionType: String) extends KyuubiEvent {
// operation events are partitioned by the date when the corresponding operations are
// created.
@@ -67,7 +70,7 @@ object KyuubiOperationEvent {
* Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance
*/
def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
- val session = operation.getSession
+ val session = operation.getSession.asInstanceOf[KyuubiSession]
val status = operation.getStatus
new KyuubiOperationEvent(
operation.statementId,
@@ -81,6 +84,7 @@ object KyuubiOperationEvent {
status.completed,
status.exception,
session.handle.identifier.toString,
- session.user)
+ session.user,
+ session.sessionType.toString)
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 5912e0b00..e0eb7c05d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -29,10 +29,11 @@ import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
+import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
+import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
@@ -60,6 +61,7 @@ class BatchJobSubmission(
recoveryMetadata: Option[Metadata])
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
+ EventBus.post(KyuubiOperationEvent(this))
override def shouldRunAsync: Boolean = true
@@ -143,6 +145,13 @@ class BatchJobSubmission(
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
if (state != CANCELED) {
setState(newState)
+ applicationInfo.filter(_.id != null).foreach { ai =>
+ session.getSessionEvent.foreach(_.engineId = ai.id)
+ }
+ if (newState == RUNNING) {
+ session.onEngineOpened()
+ }
+ EventBus.post(KyuubiOperationEvent(this))
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index feccc8ee0..8e5830293 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -77,11 +77,14 @@ class KyuubiBatchSessionImpl(
sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
}
+ override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
+ normalizedConf.get(KyuubiConf.SESSION_NAME.key))
+
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(
this,
batchRequest.getBatchType,
- batchRequest.getName,
+ name.orNull,
batchRequest.getResource,
batchRequest.getClassName,
normalizedConf,
@@ -101,6 +104,7 @@ class KyuubiBatchSessionImpl(
}
private val sessionEvent = KyuubiSessionEvent(this)
+ recoveryMetadata.map(metadata => sessionEvent.engineId = metadata.engineId)
EventBus.post(sessionEvent)
override def getSessionEvent: Option[KyuubiSessionEvent] = {
@@ -119,7 +123,7 @@ class KyuubiBatchSessionImpl(
}
}
- override def open(): Unit = {
+ override def open(): Unit = handleSessionException {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_TOTAL)
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
@@ -136,7 +140,7 @@ class KyuubiBatchSessionImpl(
state = OperationState.PENDING.toString,
resource = batchRequest.getResource,
className = batchRequest.getClassName,
- requestName = batchRequest.getName,
+ requestName = name.orNull,
requestConf = normalizedConf,
requestArgs = batchRequest.getArgs.asScala,
createTime = createTime,
@@ -152,6 +156,14 @@ class KyuubiBatchSessionImpl(
super.open()
runOperation(batchJobSubmissionOp)
+ sessionEvent.totalOperations += 1
+ }
+
+ private[kyuubi] def onEngineOpened(): Unit = {
+ if (sessionEvent.openedTime <= 0) {
+ sessionEvent.openedTime = System.currentTimeMillis()
+ EventBus.post(sessionEvent)
+ }
}
override def close(): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index fb6aa60b8..5f6866bc8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -108,7 +108,7 @@ class KyuubiSessionImpl(
private var _engineSessionHandle: SessionHandle = _
- override def open(): Unit = {
+ override def open(): Unit = handleSessionException {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_TOTAL)
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 8022b6cf3..7fca01efd 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -35,8 +35,10 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.OperationState._
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
-class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper {
+class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper
+ with BatchTestHelper {
private val engineLogRoot = "file://" + Utils.createTempDir().toString
private val serverLogRoot = "file://" + Utils.createTempDir().toString
@@ -116,6 +118,7 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
assert(res.getString("remoteSessionId") == "")
assert(res.getLong("startTime") > 0)
assert(res.getInt("totalOperations") == 0)
+ assert(res.getString("sessionType") === SessionType.SQL.toString)
assert(res.next())
assert(res.getInt("totalOperations") == 0)
assert(res.getString("sessionId") == sid)
@@ -123,10 +126,35 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
assert(res.getLong("openedTime") > 0)
assert(res.next())
assert(res.getInt("totalOperations") == 1)
+ assert(res.getString("sessionType") === SessionType.SQL.toString)
assert(res.getLong("endTime") > 0)
assert(!res.next())
}
}
+
+ val batchRequest = newSparkBatchRequest()
+ val sessionMgr = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+ val batchSessionHandle = sessionMgr.openBatchSession(
+ Utils.currentUser,
+ "kyuubi",
+ "127.0.0.1",
+ Map.empty,
+ batchRequest)
+ withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
+ withJdbcStatement() { statement =>
+ val res = statement.executeQuery(
+ s"SELECT * FROM `json`.`$serverSessionEventPath` " +
+ s"where sessionName = '${sparkBatchTestAppName}' order by totalOperations")
+ assert(res.next())
+ assert(res.getString("user") == Utils.currentUser)
+ assert(res.getString("sessionName") == sparkBatchTestAppName)
+ assert(res.getString("sessionId") === batchSessionHandle.identifier.toString)
+ assert(res.getString("remoteSessionId") == "")
+ assert(res.getLong("startTime") > 0)
+ assert(res.getInt("totalOperations") == 0)
+ assert(res.getString("sessionType") === SessionType.BATCH.toString)
+ }
+ }
}
test("engine session id is not same with server session id") {