You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/07 11:31:54 UTC

[kyuubi] branch master updated: [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3b81395 [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
ae3b81395 is described below

commit ae3b81395c9a5805d226d56671c2acdbbf365a31
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Sun May 7 19:31:45 2023 +0800

    [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
    
    ### _Why are the changes needed?_
    
    Currently, BatchJobSubmission is only allowed to run in async mode, this PR makes the `shouldRunAsync` configurable and allows BatchJobSubmission to run in sync mode. (To minimize the change, in sync mode, the real submission and monitoring still happen on the exec pool, the BatchJobSubmission just blocks until the batch is finished)
    
    This PR also refactors the constructor parameters of `KyuubiBatchSessionImpl`, and unwrapped the BatchRequest to make it fit the Batch V2 design.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4798 from pan3793/batch-sync.
    
    Closes #4798
    
    38eee2708 [Cheng Pan] Allows BatchJobSubmission run in sync mode
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/operation/BatchJobSubmission.scala      | 52 ++++++++----------
 .../kyuubi/operation/KyuubiOperationManager.scala  |  6 ++-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 57 ++++++++++----------
 .../kyuubi/session/KyuubiSessionManager.scala      | 62 +++++++++++++++-------
 4 files changed, 98 insertions(+), 79 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 702a9a917..e77416d31 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -58,12 +58,11 @@ class BatchJobSubmission(
     className: String,
     batchConf: Map[String, String],
     batchArgs: Seq[String],
-    recoveryMetadata: Option[Metadata])
+    recoveryMetadata: Option[Metadata],
+    override val shouldRunAsync: Boolean)
   extends KyuubiApplicationOperation(session) {
   import BatchJobSubmission._
 
-  override def shouldRunAsync: Boolean = true
-
   private val _operationLog = OperationLog.createOperationLog(session, getHandle)
 
   private val applicationManager = session.sessionManager.applicationManager
@@ -131,17 +130,10 @@ class BatchJobSubmission(
     session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
 
   private def updateBatchMetadata(): Unit = {
-    val endTime =
-      if (isTerminalState(state)) {
-        lastAccessTime
-      } else {
-        0L
-      }
+    val endTime = if (isTerminalState(state)) lastAccessTime else 0L
 
-    if (isTerminalState(state)) {
-      if (_applicationInfo.isEmpty) {
-        _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
-      }
+    if (isTerminalState(state) && _applicationInfo.isEmpty) {
+      _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
     }
 
     _applicationInfo.foreach { appInfo =>
@@ -187,27 +179,24 @@ class BatchJobSubmission(
   override protected def runInternal(): Unit = session.handleSessionException {
     val asyncOperation: Runnable = () => {
       try {
-        if (recoveryMetadata.exists(_.peerInstanceClosed)) {
-          setState(OperationState.CANCELED)
-        } else {
-          // If it is in recovery mode, only re-submit batch job if previous state is PENDING and
-          // fail to fetch the status including appId from resource manager. Otherwise, monitor the
-          // submitted batch application.
-          recoveryMetadata.map { metadata =>
-            if (metadata.state == OperationState.PENDING.toString) {
-              _applicationInfo = currentApplicationInfo()
-              applicationId(_applicationInfo) match {
-                case Some(appId) => monitorBatchJob(appId)
-                case None => submitAndMonitorBatchJob()
-              }
-            } else {
-              monitorBatchJob(metadata.engineId)
+        recoveryMetadata match {
+          case Some(metadata) if metadata.peerInstanceClosed =>
+            setState(OperationState.CANCELED)
+          case Some(metadata) if metadata.state == OperationState.PENDING.toString =>
+            // In recovery mode, only submit batch job when previous state is PENDING
+            // and fail to fetch the status including appId from resource manager.
+            // Otherwise, monitor the submitted batch application.
+            _applicationInfo = currentApplicationInfo()
+            applicationId(_applicationInfo) match {
+              case Some(appId) => monitorBatchJob(appId)
+              case None => submitAndMonitorBatchJob()
             }
-          }.getOrElse {
+          case Some(metadata) =>
+            monitorBatchJob(metadata.engineId)
+          case None =>
             submitAndMonitorBatchJob()
-          }
-          setStateIfNotCanceled(OperationState.FINISHED)
         }
+        setStateIfNotCanceled(OperationState.FINISHED)
       } catch {
         onError()
       } finally {
@@ -225,6 +214,7 @@ class BatchJobSubmission(
         updateBatchMetadata()
       }
     }
+    if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
   private def submitAndMonitorBatchJob(): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index dd4889653..6846d0316 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -81,7 +81,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
       className: String,
       batchConf: Map[String, String],
       batchArgs: Seq[String],
-      recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
+      recoveryMetadata: Option[Metadata],
+      shouldRunAsync: Boolean): BatchJobSubmission = {
     val operation = new BatchJobSubmission(
       session,
       batchType,
@@ -90,7 +91,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
       className,
       batchConf,
       batchArgs,
-      recoveryMetadata)
+      recoveryMetadata,
+      shouldRunAsync)
     addOperation(operation)
     operation
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 94859a08c..ba2046829 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.client.api.v1.dto.BatchRequest
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
@@ -38,8 +37,14 @@ class KyuubiBatchSessionImpl(
     conf: Map[String, String],
     override val sessionManager: KyuubiSessionManager,
     val sessionConf: KyuubiConf,
-    batchRequest: BatchRequest,
-    recoveryMetadata: Option[Metadata] = None)
+    batchType: String,
+    batchName: Option[String],
+    resource: String,
+    className: String,
+    batchConf: Map[String, String],
+    batchArgs: Seq[String],
+    recoveryMetadata: Option[Metadata] = None,
+    shouldRunAsync: Boolean)
   extends KyuubiSession(
     TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
     user,
@@ -68,42 +73,41 @@ class KyuubiBatchSessionImpl(
   override val sessionIdleTimeoutThreshold: Long =
     sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
 
-  override val normalizedConf: Map[String, String] = {
-    sessionConf.getBatchConf(batchRequest.getBatchType) ++
-      sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
-  }
+  override val normalizedConf: Map[String, String] =
+    sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(batchConf)
 
-  private val optimizedConf: Map[String, String] = {
+  val optimizedConf: Map[String, String] = {
     val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
       user,
       normalizedConf.asJava)
     if (confOverlay != null) {
       val overlayConf = new KyuubiConf(false)
       confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
-      normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
+      normalizedConf ++ overlayConf.getBatchConf(batchType)
     } else {
       warn(s"the server plugin return null value for user: $user, ignore it")
       normalizedConf
     }
   }
 
-  override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
-    optimizedConf.get(KyuubiConf.SESSION_NAME.key))
+  override lazy val name: Option[String] =
+    batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
 
   // whether the resource file is from uploading
-  private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
-    .getOrDefault(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
+  private[kyuubi] val isResourceUploaded: Boolean =
+    batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
 
   private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
     .newBatchJobSubmissionOperation(
       this,
-      batchRequest.getBatchType,
+      batchType,
       name.orNull,
-      batchRequest.getResource,
-      batchRequest.getClassName,
+      resource,
+      className,
       optimizedConf,
-      batchRequest.getArgs.asScala,
-      recoveryMetadata)
+      batchArgs,
+      recoveryMetadata,
+      shouldRunAsync)
 
   private def waitMetadataRequestsRetryCompletion(): Unit = {
     val batchId = batchJobSubmissionOp.batchId
@@ -127,14 +131,11 @@ class KyuubiBatchSessionImpl(
 
   override def checkSessionAccessPathURIs(): Unit = {
     KyuubiApplicationManager.checkApplicationAccessPaths(
-      batchRequest.getBatchType,
+      batchType,
       optimizedConf,
       sessionManager.getConf)
-    if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
-      && !isResourceUploaded) {
-      KyuubiApplicationManager.checkApplicationAccessPath(
-        batchRequest.getResource,
-        sessionManager.getConf)
+    if (resource != SparkProcessBuilder.INTERNAL_RESOURCE && !isResourceUploaded) {
+      KyuubiApplicationManager.checkApplicationAccessPath(resource, sessionManager.getConf)
     }
   }
 
@@ -150,13 +151,13 @@ class KyuubiBatchSessionImpl(
         ipAddress = ipAddress,
         kyuubiInstance = connectionUrl,
         state = OperationState.PENDING.toString,
-        resource = batchRequest.getResource,
-        className = batchRequest.getClassName,
+        resource = resource,
+        className = className,
         requestName = name.orNull,
         requestConf = optimizedConf,
-        requestArgs = batchRequest.getArgs.asScala,
+        requestArgs = batchArgs,
         createTime = createTime,
-        engineType = batchRequest.getBatchType,
+        engineType = batchType,
         clusterManager = batchJobSubmissionOp.builder.clusterManager())
 
       // there is a chance that operation failed w/ duplicated key error
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index b0ed144a5..0ef3f1ac1 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -124,23 +124,38 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
     }
   }
 
-  private def createBatchSession(
+  // scalastyle:off
+  def createBatchSession(
       user: String,
       password: String,
       ipAddress: String,
       conf: Map[String, String],
-      batchRequest: BatchRequest,
-      recoveryMetadata: Option[Metadata] = None): KyuubiBatchSessionImpl = {
+      batchType: String,
+      batchName: Option[String],
+      resource: String,
+      className: String,
+      batchConf: Map[String, String],
+      batchArgs: Seq[String],
+      recoveryMetadata: Option[Metadata] = None,
+      shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
+    // scalastyle:on
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+    val sessionConf = this.getConf.getUserDefaults(user)
     new KyuubiBatchSessionImpl(
       username,
       password,
       ipAddress,
       conf,
       this,
-      this.getConf.getUserDefaults(user),
-      batchRequest,
-      recoveryMetadata)
+      sessionConf,
+      batchType,
+      batchName,
+      resource,
+      className,
+      batchConf,
+      batchArgs,
+      recoveryMetadata,
+      shouldRunAsync)
   }
 
   private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): SessionHandle = {
@@ -178,8 +193,21 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
       password: String,
       ipAddress: String,
       conf: Map[String, String],
-      batchRequest: BatchRequest): SessionHandle = {
-    val batchSession = createBatchSession(user, password, ipAddress, conf, batchRequest)
+      batchRequest: BatchRequest,
+      shouldRunAsync: Boolean = true): SessionHandle = {
+    val batchSession = createBatchSession(
+      user,
+      password,
+      ipAddress,
+      conf,
+      batchRequest.getBatchType,
+      Option(batchRequest.getName),
+      batchRequest.getResource,
+      batchRequest.getClassName,
+      batchRequest.getConf.asScala.toMap,
+      batchRequest.getArgs.asScala,
+      None,
+      shouldRunAsync)
     openBatchSession(batchSession)
   }
 
@@ -246,21 +274,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
         kyuubiInstance,
         0,
         Int.MaxValue).map { metadata =>
-        val batchRequest = new BatchRequest(
-          metadata.engineType,
-          metadata.resource,
-          metadata.className,
-          metadata.requestName,
-          metadata.requestConf.asJava,
-          metadata.requestArgs.asJava)
-
         createBatchSession(
           metadata.username,
           "anonymous",
           metadata.ipAddress,
           metadata.requestConf,
-          batchRequest,
-          Some(metadata))
+          metadata.engineType,
+          Option(metadata.requestName),
+          metadata.resource,
+          metadata.className,
+          metadata.requestConf,
+          metadata.requestArgs,
+          Some(metadata),
+          shouldRunAsync = true)
       }).getOrElse(Seq.empty)
     }
   }