You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/15 08:43:00 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d5c64cb [KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
b6d5c64cb is described below

commit b6d5c64cb4e73319ece6a6cdc0a750d9197784a3
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Fri Apr 15 16:42:36 2022 +0800

    [KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
    
    ### _Why are the changes needed?_
    
    To close #2306 and close #2307
    
    In this PR, I implement BatchJobSubmission operation and  introduce basic `KyuubiBatchSessionImpl`.
    TODO:
    - Normalize/validate the batch request
      -  batch request fields
      - merge with server pre-defined batch conf
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2353 from turboFei/KPIP_4_batch_submission_op.
    
    Closes #2353
    
    9bc6050c [Fei Wang] remove unused conf
    ef8e962c [Fei Wang] check application in current thread
    8738660b [Fei Wang] dedup code
    27f22008 [Fei Wang] use static secret id instead of conf
    6794ff7a [Fei Wang] Use Seq instead of java.util.List
    2f4f9b15 [Fei Wang] Remove BatchType enumaration
    7d380800 [Fei Wang] remove dead code
    a94a9e6a [Fei Wang] remove jars,files fileds on BatchRequest
    6021a1e7 [Fei Wang] add ut for result set
    07a939c9 [Fei Wang] refactor long line
    a918a496 [Fei Wang] address comments
    73229e70 [Fei Wang] set engine max life time
    bbe3f1f4 [Fei Wang] unique the application cehcker thread pool
    9643e42c [Fei Wang] refactor
    11dd71f7 [Fei Wang] add KyuubiBatchYarnClusterSuite
    12169910 [Fei Wang] add ut for batch session
    47da8c1a [Fei Wang] add open batch session api
    6dcf60d9 [Fei Wang] add ut for static batch secret id
    a212e62b [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 docs/deployment/settings.md                        |   7 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |   7 +
 .../engine/spark/SparkBatchProcessBuilder.scala    |  13 +-
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  |  14 +-
 .../kyuubi/operation/BatchJobSubmission.scala      | 183 +++++++++++++++++++++
 .../kyuubi/operation/KyuubiOperationManager.scala  |  11 +-
 .../org/apache/kyuubi/server/api/v1/dto.scala      |   6 +-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |  74 +++++++++
 .../kyuubi/session/KyuubiSessionManager.scala      |  61 +++++++
 .../spark/SparkBatchProcessBuilderSuite.scala      |   5 +-
 .../operation/KyuubiBatchYarnClusterSuite.scala    |  83 ++++++++++
 11 files changed, 450 insertions(+), 14 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 7c5d63d97..6962dc0bb 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -154,6 +154,13 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.backend.server.exec.pool.wait.queue.size</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool of Kyuubi server</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
 
 
+### Batch
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+<code>kyuubi.batch.application.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check batch job application information.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
+
+
 ### Credentials
 
 Key | Default | Meaning | Type | Since
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 4e8fb9b51..81bc3259b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -806,6 +806,13 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(true)
 
+  val BATCH_APPLICATION_CHECK_INTERVAL: ConfigEntry[Long] =
+    buildConf("kyuubi.batch.application.check.interval")
+      .doc("The interval to check batch job application information.")
+      .version("1.6.0")
+      .timeConf
+      .createWithDefaultString("PT5S")
+
   val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.backend.server.exec.pool.size")
       .doc("Number of threads in the operation execution thread pool of Kyuubi server")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 92486acf7..108f13f64 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -47,7 +47,8 @@ class SparkBatchProcessBuilder(
     buffer += mainClass
 
     val batchJobTag = batchRequest.conf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
-    val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag)
+
+    val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag) ++ sparkAppNameConf()
 
     allConf.foreach { case (k, v) =>
       buffer += CONF
@@ -59,15 +60,21 @@ class SparkBatchProcessBuilder(
 
     mainResource.foreach { r => buffer += r }
 
-    batchRequest.args.asScala.foreach { arg => buffer += arg }
+    batchRequest.args.foreach { arg => buffer += arg }
 
     buffer.toArray
   }
 
+  private def sparkAppNameConf(): Map[String, String] = {
+    Option(batchRequest.name).filterNot(_.isEmpty).map { appName =>
+      Map(APP_KEY -> appName)
+    }.getOrElse(Map())
+  }
+
   override protected def module: String = "kyuubi-spark-batch-submit"
 
   private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
-    batchRequest.conf.get("spark.master") match {
+    batchRequest.conf.get(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
       case Some("yarn") =>
         val yarnClient = getYarnClient
         val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 18870a611..12170c75b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.spark
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.nio.file.Paths
 
 import scala.collection.mutable.ArrayBuffer
@@ -164,11 +164,20 @@ class SparkProcessBuilder(
     }
 
   override protected def shortName: String = "spark"
+
+  protected def getSparkDefaultsConf(): Map[String, String] = {
+    val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
+      .orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
+      .map(_ + File.separator + SPARK_CONF_FILE_NAME)
+      .map(new File(_)).filter(_.exists())
+    Utils.getPropertiesFromFile(sparkDefaultsConfFile)
+  }
 }
 
 object SparkProcessBuilder {
   final val APP_KEY = "spark.app.name"
   final val TAG_KEY = "spark.yarn.tags"
+  final val MASTER_KEY = "spark.master"
 
   final private[spark] val CONF = "--conf"
   final private[spark] val CLASS = "--class"
@@ -178,4 +187,7 @@ object SparkProcessBuilder {
   final private[spark] val KEYTAB = "spark.kerberos.keytab"
   // Get the appropriate spark-submit file
   final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
+  final private val SPARK_HOME = "SPARK_HOME"
+  final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
+  final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
new file mode 100644
index 000000000..8303238c2
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.nio.ByteBuffer
+import java.util.{ArrayList => JArrayList, Locale}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.util.ThriftUtils
+
+class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchRequest)
+  extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
+
+  override def statement: String = "BATCH_JOB_SUBMISSION"
+
+  override def shouldRunAsync: Boolean = true
+
+  private lazy val _operationLog = OperationLog.createOperationLog(session, getHandle)
+
+  private var builder: ProcBuilder = _
+
+  @volatile
+  private[kyuubi] var appIdAndUrl: Option[(String, String)] = None
+
+  private var resultFetched: Boolean = _
+
+  private val applicationCheckInterval =
+    session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
+
+  override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
+  override protected def beforeRun(): Unit = {
+    OperationLog.setCurrentOperationLog(_operationLog)
+    setHasResultSet(false)
+    setState(OperationState.PENDING)
+  }
+
+  override protected def afterRun(): Unit = {
+    OperationLog.removeCurrentOperationLog()
+  }
+
+  override protected def runInternal(): Unit = {
+    val asyncOperation: Runnable = () => {
+      setState(OperationState.RUNNING)
+      try {
+        submitBatchJob()
+        setState(OperationState.FINISHED)
+      } catch onError()
+    }
+    try {
+      val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
+      setBackgroundHandle(opHandle)
+    } catch onError("submitting batch job submission operation in background, request rejected")
+  }
+
+  private def submitBatchJob(): Unit = {
+    builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
+      case Some("SPARK") =>
+        new SparkBatchProcessBuilder(
+          session.user,
+          session.sessionConf,
+          session.batchId,
+          batchRequest,
+          getOperationLog)
+
+      case _ =>
+        throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
+    }
+
+    try {
+      info(s"Submitting ${batchRequest.batchType} batch job: $builder")
+      val process = builder.start
+      while (appIdAndUrl.isEmpty) {
+        try {
+          builder match {
+            case sparkBatchProcessBuilder: SparkBatchProcessBuilder =>
+              sparkBatchProcessBuilder.getApplicationIdAndUrl() match {
+                case Some(appInfo) => appIdAndUrl = Some(appInfo)
+                case _ =>
+              }
+
+            case _ =>
+          }
+        } catch {
+          case e: Exception => error(s"Failed to check batch application", e)
+        }
+        Thread.sleep(applicationCheckInterval)
+      }
+      process.waitFor()
+      if (process.exitValue() != 0) {
+        throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
+      }
+    } finally {
+      builder.close()
+    }
+  }
+
+  override def getResultSetSchema: TTableSchema = {
+    val schema = new TTableSchema()
+    Seq("ApplicationId", "URL").zipWithIndex.foreach { case (colName, position) =>
+      val tColumnDesc = new TColumnDesc()
+      tColumnDesc.setColumnName(colName)
+      val tTypeDesc = new TTypeDesc()
+      tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(new TPrimitiveTypeEntry(TTypeId.STRING_TYPE)))
+      tColumnDesc.setTypeDesc(tTypeDesc)
+      tColumnDesc.setPosition(position)
+      schema.addToColumns(tColumnDesc)
+    }
+    schema
+  }
+
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+    validateDefaultFetchOrientation(order)
+    assertState(OperationState.FINISHED)
+    setHasResultSet(true)
+    order match {
+      case FETCH_NEXT => fetchNext()
+      case FETCH_PRIOR => resultSet
+      case FETCH_FIRST => resultSet
+    }
+  }
+
+  private lazy val resultSet: TRowSet = {
+    val tRow = new TRowSet(0, new JArrayList[TRow](1))
+    val (appId, url) = appIdAndUrl.toSeq.unzip
+
+    val tAppIdColumn = TColumn.stringVal(new TStringColumn(
+      appId.asJava,
+      ByteBuffer.allocate(0)))
+
+    val tUrlColumn = TColumn.stringVal(new TStringColumn(
+      url.asJava,
+      ByteBuffer.allocate(0)))
+
+    tRow.addToColumns(tAppIdColumn)
+    tRow.addToColumns(tUrlColumn)
+    tRow
+  }
+
+  private def fetchNext(): TRowSet = {
+    if (!resultFetched) {
+      resultFetched = true
+      resultSet
+    } else {
+      ThriftUtils.EMPTY_ROW_SET
+    }
+  }
+
+  override def close(): Unit = {
+    if (!isClosedOrCanceled) {
+      if (builder != null) {
+        builder.close()
+      }
+    }
+    super.close()
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index db2898b3e..e7a7d3e6c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -26,7 +26,8 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
 class KyuubiOperationManager private (name: String) extends OperationManager(name) {
@@ -62,6 +63,14 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
     addOperation(operation)
   }
 
+  def newBatchJobSubmissionOperation(
+      session: KyuubiBatchSessionImpl,
+      batchRequest: BatchRequest): BatchJobSubmission = {
+    val operation = new BatchJobSubmission(session, batchRequest)
+    addOperation(operation)
+    operation
+  }
+
   override def newGetTypeInfoOperation(session: Session): Operation = {
     val operation = new GetTypeInfo(session)
     addOperation(operation)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index ec62b84fe..395ab3eb7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -111,8 +111,6 @@ case class Field(dataType: String, value: Any)
  * @param resource the main resource jar, required.
  * @param proxyUser the proxy user, optional.
  * @param className the main class name, required.
- * @param jars comma-separated list of jars to include, optional.
- * @param files comma-separated list of files to include, optional.
  * @param name a name of your batch job, optional.
  * @param conf arbitrary configuration properties, optional.
  * @param args comma-separated list of batch job arguments, optional.
@@ -122,8 +120,6 @@ case class BatchRequest(
     resource: String,
     proxyUser: String,
     className: String,
-    jars: java.util.List[String],
-    files: java.util.List[String],
     name: String,
     conf: Map[String, String],
-    args: java.util.List[String])
+    args: Seq[String])
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
new file mode 100644
index 000000000..57beeb4eb
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.session
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.{EventBus, KyuubiEvent, KyuubiSessionEvent}
+import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
+import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.api.v1.BatchRequest
+
+class KyuubiBatchSessionImpl(
+    protocol: TProtocolVersion,
+    user: String,
+    password: String,
+    ipAddress: String,
+    conf: Map[String, String],
+    sessionManager: KyuubiSessionManager,
+    val sessionConf: KyuubiConf,
+    batchRequest: BatchRequest)
+  extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+  override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
+  val batchId: String = handle.identifier.toString
+
+  private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
+    .newBatchJobSubmissionOperation(this, batchRequest)
+
+  private val sessionEvent = KyuubiSessionEvent(this)
+  EventBus.post(sessionEvent)
+
+  override def getSessionEvent: Option[KyuubiEvent] = {
+    Option(sessionEvent)
+  }
+
+  override def open(): Unit = {
+    MetricsSystem.tracing { ms =>
+      ms.incCount(CONN_TOTAL)
+      ms.incCount(MetricRegistry.name(CONN_OPEN, user))
+    }
+
+    // we should call super.open before running batch job submission operation
+    super.open()
+
+    runOperation(batchJobSubmissionOp)
+  }
+
+  override def close(): Unit = {
+    if (!OperationState.isTerminal(batchJobSubmissionOp.getStatus.state)) {
+      closeOperation(batchJobSubmissionOp.getHandle)
+    }
+    super.close()
+    sessionEvent.endTime = System.currentTimeMillis()
+    EventBus.post(sessionEvent)
+    MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 4fb3dd9d8..982c14ca3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,10 +17,13 @@
 
 package org.apache.kyuubi.session
 
+import java.util.UUID
+
 import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.cli.HandleIdentifier
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.credentials.HadoopCredentialsManager
@@ -28,8 +31,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.KyuubiOperationManager
 import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}
+import org.apache.kyuubi.server.api.v1.BatchRequest
 
 class KyuubiSessionManager private (name: String) extends SessionManager(name) {
+  import KyuubiSessionManager._
 
   def this() = this(classOf[KyuubiSessionManager].getSimpleName)
 
@@ -84,6 +89,53 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
     }
   }
 
+  def openBatchSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddress: String,
+      conf: Map[String, String],
+      batchRequest: BatchRequest): SessionHandle = {
+    val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+    val batchSession = new KyuubiBatchSessionImpl(
+      protocol,
+      user,
+      password,
+      ipAddress,
+      conf,
+      this,
+      // TODO: user defaults conf for batch session
+      this.getConf.getUserDefaults(user),
+      batchRequest)
+    try {
+      val handle = batchSession.handle
+      batchSession.open()
+      setSession(handle, batchSession)
+      info(s"$user's batch session with $handle is opened, current opening sessions" +
+        s" $getOpenSessionCount")
+      handle
+    } catch {
+      case e: Exception =>
+        try {
+          batchSession.close()
+        } catch {
+          case t: Throwable =>
+            warn(s"Error closing batch session for $user client ip: $ipAddress", t)
+        }
+        MetricsSystem.tracing { ms =>
+          ms.incCount(CONN_FAIL)
+          ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+        }
+        throw KyuubiSQLException(
+          s"Error opening batch session for $username client ip $ipAddress, due to ${e.getMessage}",
+          e)
+    }
+  }
+
+  def newBatchSessionHandle(protocol: TProtocolVersion): SessionHandle = {
+    SessionHandle(HandleIdentifier(UUID.randomUUID(), STATIC_BATCH_SECRET_UUID), protocol)
+  }
+
   override def start(): Unit = synchronized {
     MetricsSystem.tracing { ms =>
       ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
@@ -95,3 +147,12 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
 
   override protected def isServer: Boolean = true
 }
+
+object KyuubiSessionManager {
+
+  /**
+   * The static session secret UUID used for batch session handle.
+   * To keep compatibility, please do not change it.
+   */
+  val STATIC_BATCH_SECRET_UUID: UUID = UUID.fromString("c2ee5b97-3ea0-41fc-ac16-9bd708ed8f38")
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
index de0f617e0..2dd8a0e5e 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark
 
 import java.util.UUID
 
-import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import org.apache.kyuubi.KyuubiFunSuite
@@ -57,11 +56,9 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
       sparkProcessBuilder.mainResource.get,
       "kyuubi",
       sparkProcessBuilder.mainClass,
-      List.empty[String].asJava,
-      List.empty[String].asJava,
       "spark-batch-submission",
       Map("spark.master" -> "yarn"),
-      List.empty[String].asJava)
+      Seq.empty[String])
 
     val builder = new SparkBatchProcessBuilder(
       batchRequest.proxyUser,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
new file mode 100644
index 000000000..7f8835277
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.WithKyuubiServerOnYarn
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.server.api.v1.BatchRequest
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+
+class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
+  override protected val connectionConf: Map[String, String] = Map.empty
+
+  override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
+
+  private def sessionManager(): KyuubiSessionManager =
+    server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+
+  test("open batch session") {
+    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+    val batchRequest = BatchRequest(
+      "spark",
+      sparkProcessBuilder.mainResource.get,
+      "kyuubi",
+      sparkProcessBuilder.mainClass,
+      "spark-batch-submission",
+      Map(
+        "spark.master" -> "yarn",
+        s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+        s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+      Seq.empty[String])
+
+    val sessionHandle = sessionManager.openBatchSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+      batchRequest.proxyUser,
+      "passwd",
+      "localhost",
+      batchRequest.conf,
+      batchRequest)
+
+    assert(sessionHandle.identifier.secretId === KyuubiSessionManager.STATIC_BATCH_SECRET_UUID)
+    val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+    val batchJobSubmissionOp = session.batchJobSubmissionOp
+
+    eventually(timeout(3.minutes), interval(500.milliseconds)) {
+      val applicationIdAndUrl = batchJobSubmissionOp.appIdAndUrl
+      assert(applicationIdAndUrl.isDefined)
+      assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
+      assert(applicationIdAndUrl.exists(_._2.nonEmpty))
+
+      assert(batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
+      val resultColumns = batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 1)
+        .getColumns.asScala
+      val appId = resultColumns.apply(0).getStringVal.getValues.asScala.apply(0)
+      val url = resultColumns.apply(1).getStringVal.getValues.asScala.apply(0)
+      assert(appId === batchJobSubmissionOp.appIdAndUrl.get._1)
+      assert(url === batchJobSubmissionOp.appIdAndUrl.get._2)
+    }
+    sessionManager.closeSession(sessionHandle)
+  }
+}