You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/08 02:32:01 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e72572515 [KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
e72572515 is described below

commit e725725155dc56460518e2c0e5df5ffa562aa5f8
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Jun 8 10:31:50 2022 +0800

    [KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
    
    ### _Why are the changes needed?_
    
    To close #2373
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2790 from turboFei/recovery_batch.
    
    Closes #2373
    
    81632191 [Fei Wang] trigger test
    6ac6f9bd [Fei Wang] async recovery
    6d9edf4e [Fei Wang] comments
    daa6719f [Fei Wang] refactor
    0c3a2e14 [Fei Wang] remove waitAppCompletion
    04e139ee [Fei Wang] comments
    b5d11d8f [Fei Wang] comment for method name
    321dfa95 [Fei Wang] refactor
    90be2df7 [Fei Wang] address comments
    14409582 [Fei Wang] batch recovery
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 docs/deployment/settings.md                        |   2 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  14 +++
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala |   7 +-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |   4 +
 .../kyuubi/operation/BatchJobSubmission.scala      |  69 +++++++++++++-
 .../kyuubi/operation/KyuubiOperationManager.scala  |   7 +-
 .../kyuubi/server/KyuubiRestFrontendService.scala  |  50 +++++++++-
 .../server/statestore/SessionStateStore.scala      |  18 ++++
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |  52 ++++++-----
 .../kyuubi/session/KyuubiSessionManager.scala      |  78 ++++++++++++++--
 .../server/api/v1/BatchesResourceSuite.scala       | 101 ++++++++++++++++++++-
 11 files changed, 357 insertions(+), 45 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 78dc2f9fa..3650caa5d 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -360,6 +360,8 @@ kyuubi.server.state.store.jdbc.password||The password for server jdbc state stor
 kyuubi.server.state.store.jdbc.url|jdbc:derby:memory:kyuubi_state_store_db;create=true|The jdbc url for server jdbc state store. By defaults, it is a DERBY in-memory database url, and the state information is not shared across kyuubi instances. To enable multiple kyuubi instances high available, please specify a production jdbc url.|string|1.6.0
 kyuubi.server.state.store.jdbc.user||The username for server jdbc state store.|string|1.6.0
 kyuubi.server.state.store.max.age|PT72H|The maximum age of state info in state store.|duration|1.6.0
+kyuubi.server.state.store.sessions.recovery.num.threads|10|The number of threads for sessions recovery from state store.|int|1.6.0
+kyuubi.server.state.store.sessions.recovery.per.batch|100|The number of sessions to recover from state store per batch.|int|1.6.0
 
 
 ### Session
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7bd3879fc..179cf848b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -884,6 +884,20 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(Duration.ofMinutes(30).toMillis)
 
+  val SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH: ConfigEntry[Int] =
+    buildConf("kyuubi.server.state.store.sessions.recovery.per.batch")
+      .doc("The number of sessions to recover from state store per batch.")
+      .version("1.6.0")
+      .intConf
+      .createWithDefault(100)
+
+  val SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS: ConfigEntry[Int] =
+    buildConf("kyuubi.server.state.store.sessions.recovery.num.threads")
+      .doc("The number of threads for sessions recovery from state store.")
+      .version("1.6.0")
+      .intConf
+      .createWithDefault(10)
+
   val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
       .doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index c5a3944e6..e5552c914 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.util
 
-import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
 
 import scala.concurrent.Awaitable
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -53,6 +53,11 @@ object ThreadUtils extends Logging {
     executor
   }
 
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+    val threadFactory = new NamedThreadFactory(prefix, daemon = true)
+    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
   def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
     try {
       // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 8734b991b..9f0aae4b8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths}
 
 import scala.collection.JavaConverters._
 
+import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.EvictingQueue
 import org.apache.commons.lang3.StringUtils.containsIgnoreCase
 
@@ -152,6 +153,8 @@ trait ProcBuilder {
   @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
   private var logCaptureThread: Thread = _
   private var process: Process = _
+  @VisibleForTesting
+  @volatile private[kyuubi] var processLaunched: Boolean = _
 
   private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
     val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -191,6 +194,7 @@ trait ProcBuilder {
 
   final def start: Process = synchronized {
     process = processBuilder.start()
+    processLaunched = true
     val reader = Files.newBufferedReader(engineLog.toPath, StandardCharsets.UTF_8)
 
     val redirect: Runnable = { () =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3cb9a3ce4..308ef0be2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,17 +25,20 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 
 import com.codahale.metrics.MetricRegistry
+import com.google.common.annotations.VisibleForTesting
 import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
+import org.apache.kyuubi.engine.ApplicationOperation._
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
 import org.apache.kyuubi.session.KyuubiBatchSessionImpl
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -58,7 +61,8 @@ class BatchJobSubmission(
     resource: String,
     className: String,
     batchConf: Map[String, String],
-    batchArgs: Seq[String])
+    batchArgs: Seq[String],
+    recoveryMetadata: Option[SessionMetadata])
   extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
 
   override def statement: String = "BATCH_JOB_SUBMISSION"
@@ -76,7 +80,8 @@ class BatchJobSubmission(
   private var killMessage: KillResponse = (false, "UNKNOWN")
   def getKillMessage: KillResponse = killMessage
 
-  private val builder: ProcBuilder = {
+  @VisibleForTesting
+  private[kyuubi] val builder: ProcBuilder = {
     Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
       case Some("SPARK") =>
         new SparkBatchProcessBuilder(
@@ -143,7 +148,22 @@ class BatchJobSubmission(
     val asyncOperation: Runnable = () => {
       setStateIfNotCanceled(OperationState.RUNNING)
       try {
-        submitBatchJob()
+        // If it is in recovery mode, only re-submit batch job if previous state is PENDING and
+        // fail to fetch the status including appId from resource manager. Otherwise, monitor the
+        // submitted batch application.
+        recoveryMetadata.map { metadata =>
+          if (metadata.state == OperationState.PENDING.toString) {
+            applicationStatus = currentApplicationState
+            applicationStatus.map(_.get(APP_ID_KEY)).map {
+              case Some(appId) => monitorBatchJob(appId)
+              case None => submitAndMonitorBatchJob()
+            }
+          } else {
+            monitorBatchJob(metadata.engineId)
+          }
+        }.getOrElse {
+          submitAndMonitorBatchJob()
+        }
         setStateIfNotCanceled(OperationState.FINISHED)
       } catch {
         onError()
@@ -169,10 +189,15 @@ class BatchJobSubmission(
       s.contains("KILLED") || s.contains("FAILED"))
   }
 
-  private def submitBatchJob(): Unit = {
+  private def applicationTerminated(applicationStatus: Option[Map[String, String]]): Boolean = {
+    applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
+      s.contains("KILLED") || s.contains("FAILED") || s.contains("FINISHED"))
+  }
+
+  private def submitAndMonitorBatchJob(): Unit = {
     var appStatusFirstUpdated = false
     try {
-      info(s"Submitting $batchType batch job: $builder")
+      info(s"Submitting $batchType batch[$batchId] job: $builder")
       val process = builder.start
       applicationStatus = currentApplicationState
       while (!applicationFailed(applicationStatus) && process.isAlive) {
@@ -192,12 +217,46 @@ class BatchJobSubmission(
         if (process.exitValue() != 0) {
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
+
+        applicationStatus.map(_.get(APP_ID_KEY)).map {
+          case Some(appId) => monitorBatchJob(appId)
+          case _ =>
+        }
       }
     } finally {
       builder.close()
     }
   }
 
+  private def monitorBatchJob(appId: String): Unit = {
+    info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
+    if (applicationStatus.isEmpty) {
+      applicationStatus = currentApplicationState
+    }
+    if (applicationStatus.isEmpty) {
+      info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
+    } else if (applicationFailed(applicationStatus)) {
+      throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
+        applicationStatus.get.mkString(","))
+    } else {
+      // TODO: add limit for max batch job submission lifetime
+      while (applicationStatus.isDefined && !applicationTerminated(applicationStatus)) {
+        Thread.sleep(applicationCheckInterval)
+        val newApplicationStatus = currentApplicationState
+        if (newApplicationStatus != applicationStatus) {
+          applicationStatus = newApplicationStatus
+          info(s"Batch report for $batchId" +
+            applicationStatus.map(_.mkString("(", ",", ")")).getOrElse("()"))
+        }
+      }
+
+      if (applicationFailed(applicationStatus)) {
+        throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
+          applicationStatus.get.mkString(","))
+      }
+    }
+  }
+
   def getOperationLogRowSet(
       order: FetchOrientation,
       from: Int,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index d542d0995..00a78e2f8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -26,6 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -69,7 +70,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
       resource: String,
       className: String,
       batchConf: Map[String, String],
-      batchArgs: Seq[String]): BatchJobSubmission = {
+      batchArgs: Seq[String],
+      recoveryMetadata: Option[SessionMetadata]): BatchJobSubmission = {
     val operation = new BatchJobSubmission(
       session,
       batchType,
@@ -77,7 +79,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
       resource,
       className,
       batchConf,
-      batchArgs)
+      batchArgs,
+      recoveryMetadata)
     addOperation(operation)
     operation
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 3eb3ec63e..52dc93238 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -18,20 +18,24 @@
 package org.apache.kyuubi.server
 
 import java.util.EnumSet
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import javax.servlet.DispatcherType
 
+import com.google.common.annotations.VisibleForTesting
 import org.apache.hadoop.conf.Configuration
 import org.eclipse.jetty.servlet.FilterHolder
 
 import org.apache.kyuubi.{KyuubiException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
+import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS}
 import org.apache.kyuubi.server.api.v1.ApiRootResource
 import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
 import org.apache.kyuubi.server.ui.JettyServer
 import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
+import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.util.ThreadUtils
 
 /**
  * A frontend service based on RESTful api via HTTP protocol.
@@ -46,6 +50,8 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
 
   private def hadoopConf: Configuration = KyuubiServer.getHadoopConf()
 
+  private def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager]
+
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     val host = conf.get(FRONTEND_REST_BIND_HOST)
       .getOrElse(Utils.findLocalInetAddress.getHostAddress)
@@ -71,10 +77,50 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
     server.addRedirectHandler("/docs", "/swagger")
   }
 
+  @VisibleForTesting
+  private[kyuubi] def recoverBatchSessions(): Unit = {
+    val recoveryNumThreads = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS)
+    val batchRecoveryExecutor =
+      ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, "batch-recovery-executor")
+    try {
+      val batchSessionsToRecover = sessionManager.getBatchSessionsToRecover(connectionUrl)
+      val pendingRecoveryTasksCount = new AtomicInteger(0)
+      val tasks = batchSessionsToRecover.flatMap { batchSession =>
+        val batchId = batchSession.batchJobSubmissionOp.batchId
+        try {
+          val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
+            Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession)))
+          Some(task -> batchId)
+        } catch {
+          case e: Throwable =>
+            error(s"Error while submitting batch[$batchId] for recovery", e)
+            None
+        }
+      }
+
+      pendingRecoveryTasksCount.addAndGet(tasks.size)
+
+      tasks.foreach { case (task, batchId) =>
+        try {
+          task.get()
+        } catch {
+          case e: Throwable =>
+            error(s"Error while recovering batch[$batchId]", e)
+        } finally {
+          val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+          info(s"Batch[$batchId] recovery task terminated, current pending tasks $pendingTasks")
+        }
+      }
+    } finally {
+      ThreadUtils.shutdown(batchRecoveryExecutor)
+    }
+  }
+
   override def start(): Unit = synchronized {
     if (!isStarted.get) {
       try {
         server.start()
+        recoverBatchSessions()
         isStarted.set(true)
         info(s"$getName has started at ${server.getServerUri}")
         startInternal()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
index 0ef14b00c..1f60ce357 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
@@ -83,6 +83,24 @@ class SessionStateStore extends AbstractService("SessionStateStore") {
       true).map(buildBatch)
   }
 
+  def getBatchesRecoveryMetadata(
+      state: String,
+      kyuubiInstance: String,
+      from: Int,
+      size: Int): Seq[SessionMetadata] = {
+    _stateStore.getMetadataList(
+      SessionType.BATCH,
+      null,
+      null,
+      state,
+      kyuubiInstance,
+      0,
+      0,
+      from,
+      size,
+      false)
+  }
+
   def updateBatchMetadata(
       batchId: String,
       state: String,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 9ffc8179b..85bfb32cb 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -40,11 +40,16 @@ class KyuubiBatchSessionImpl(
     conf: Map[String, String],
     override val sessionManager: KyuubiSessionManager,
     val sessionConf: KyuubiConf,
-    batchRequest: BatchRequest)
+    batchRequest: BatchRequest,
+    recoveryMetadata: Option[SessionMetadata] = None)
   extends KyuubiSession(protocol, user, password, ipAddress, conf, sessionManager) {
   override val sessionType: SessionType = SessionType.BATCH
 
-  override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
+  override val handle: SessionHandle = recoveryMetadata.map { metadata =>
+    sessionManager.getBatchSessionHandle(metadata.identifier, protocol)
+  }.getOrElse(sessionManager.newBatchSessionHandle(protocol))
+
+  override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
 
   // TODO: Support batch conf advisor
   override val normalizedConf: Map[String, String] = {
@@ -60,7 +65,8 @@ class KyuubiBatchSessionImpl(
       batchRequest.getResource,
       batchRequest.getClassName,
       normalizedConf,
-      batchRequest.getArgs.asScala)
+      batchRequest.getArgs.asScala,
+      recoveryMetadata)
 
   private val sessionEvent = KyuubiSessionEvent(this)
   EventBus.post(sessionEvent)
@@ -75,25 +81,27 @@ class KyuubiBatchSessionImpl(
       ms.incCount(MetricRegistry.name(CONN_OPEN, user))
     }
 
-    val metaData = SessionMetadata(
-      identifier = handle.identifier.toString,
-      sessionType = sessionType,
-      // TODO: support real user
-      realUser = user,
-      username = user,
-      ipAddress = ipAddress,
-      // TODO: support to transfer fe connection url when opening session
-      kyuubiInstance = KyuubiRestFrontendService.getConnectionUrl,
-      state = OperationState.PENDING.toString,
-      resource = batchRequest.getResource,
-      className = batchRequest.getClassName,
-      requestName = batchRequest.getName,
-      requestConf = normalizedConf,
-      requestArgs = batchRequest.getArgs.asScala,
-      createTime = createTime,
-      engineType = batchRequest.getBatchType)
-
-    sessionManager.insertMetadata(metaData)
+    if (recoveryMetadata.isEmpty) {
+      val metaData = SessionMetadata(
+        identifier = handle.identifier.toString,
+        sessionType = sessionType,
+        // TODO: support real user
+        realUser = user,
+        username = user,
+        ipAddress = ipAddress,
+        // TODO: support to transfer fe connection url when opening session
+        kyuubiInstance = KyuubiRestFrontendService.getConnectionUrl,
+        state = OperationState.PENDING.toString,
+        resource = batchRequest.getResource,
+        className = batchRequest.getClassName,
+        requestName = batchRequest.getName,
+        requestConf = normalizedConf,
+        requestArgs = batchRequest.getArgs.asScala,
+        createTime = createTime,
+        engineType = batchRequest.getBatchType)
+
+      sessionManager.insertMetadata(metaData)
+    }
 
     // we should call super.open before running batch job submission operation
     super.open()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 957233e69..67770ed7e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -19,6 +19,9 @@ package org.apache.kyuubi.session
 
 import java.util.UUID
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
 import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -32,9 +35,10 @@ import org.apache.kyuubi.credentials.HadoopCredentialsManager
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
-import org.apache.kyuubi.operation.KyuubiOperationManager
+import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}
+import org.apache.kyuubi.server.api.v1.BatchesResource
 import org.apache.kyuubi.server.statestore.SessionStateStore
 import org.apache.kyuubi.server.statestore.api.SessionMetadata
 
@@ -109,23 +113,30 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
     }
   }
 
-  def openBatchSession(
+  private def createBatchSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
       conf: Map[String, String],
-      batchRequest: BatchRequest): SessionHandle = {
+      batchRequest: BatchRequest,
+      recoveryMetadata: Option[SessionMetadata] = None): KyuubiBatchSessionImpl = {
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
-    val batchSession = new KyuubiBatchSessionImpl(
+    new KyuubiBatchSessionImpl(
       protocol,
-      user,
+      username,
       password,
       ipAddress,
       conf,
       this,
       this.getConf.getUserDefaults(user),
-      batchRequest)
+      batchRequest,
+      recoveryMetadata)
+  }
+
+  private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): SessionHandle = {
+    val user = batchSession.user
+    val ipAddress = batchSession.ipAddress
     try {
       val handle = batchSession.handle
       batchSession.open()
@@ -146,11 +157,22 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
           ms.incCount(MetricRegistry.name(CONN_FAIL, user))
         }
         throw KyuubiSQLException(
-          s"Error opening batch session for $username client ip $ipAddress, due to ${e.getMessage}",
+          s"Error opening batch session for $user client ip $ipAddress, due to ${e.getMessage}",
           e)
     }
   }
 
+  def openBatchSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddress: String,
+      conf: Map[String, String],
+      batchRequest: BatchRequest): SessionHandle = {
+    val batchSession = createBatchSession(protocol, user, password, ipAddress, conf, batchRequest)
+    openBatchSession(batchSession)
+  }
+
   def newBatchSessionHandle(protocol: TProtocolVersion): SessionHandle = {
     SessionHandle(HandleIdentifier(UUID.randomUUID(), STATIC_BATCH_SECRET_UUID), protocol)
   }
@@ -201,10 +223,50 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
       ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
       ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
     }
-    // TODO: support to recover batch sessions with session state store
     super.start()
   }
 
+  def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSessionImpl] = {
+    val recoveryPerBatch = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH)
+
+    val batchSessionsToRecover = ListBuffer[KyuubiBatchSessionImpl]()
+    Seq(OperationState.PENDING, OperationState.RUNNING).foreach { stateToRecover =>
+      var offset = 0
+      var lastRecoveryNum = Int.MaxValue
+
+      while (lastRecoveryNum >= recoveryPerBatch) {
+        val metadataList = sessionStateStore.getBatchesRecoveryMetadata(
+          stateToRecover.toString,
+          kyuubiInstance,
+          offset,
+          recoveryPerBatch)
+        metadataList.foreach { metadata =>
+          val batchRequest = new BatchRequest(
+            metadata.engineType,
+            metadata.resource,
+            metadata.className,
+            metadata.requestName,
+            metadata.requestConf.asJava,
+            metadata.requestArgs.asJava)
+
+          val batchSession = createBatchSession(
+            BatchesResource.REST_BATCH_PROTOCOL,
+            metadata.username,
+            "anonymous",
+            metadata.ipAddress,
+            metadata.requestConf,
+            batchRequest,
+            Some(metadata))
+          batchSessionsToRecover += batchSession
+        }
+
+        lastRecoveryNum = metadataList.size
+        offset += lastRecoveryNum
+      }
+    }
+    batchSessionsToRecover
+  }
+
   override protected def isServer: Boolean = true
 
   private def initSessionLimiter(conf: KyuubiConf): Unit = {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index d32ee10d7..a56c3a4fd 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -32,12 +32,16 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.KyuubiRestFrontendService
 import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionType}
 
 class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
+  private val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
 
   override def afterEach(): Unit = {
     val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
@@ -52,7 +56,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
   }
 
   test("open batch session") {
-    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
     val appName = "spark-batch-submission"
     val requestObj = new BatchRequest(
       "spark",
@@ -322,8 +325,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
   }
 
   test("negative request") {
-    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
-
     // open batch session
     Seq(
       (
@@ -367,4 +368,94 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
       assert(response.readEntity(classOf[String]).contains(msg))
     }
   }
+
+  test("batch sessions recovery") {
+    val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+    val kyuubiInstance = fe.connectionUrl
+
+    assert(sessionManager.getOpenSessionCount == 0)
+    val batchId1 = UUID.randomUUID().toString
+    val batchId2 = UUID.randomUUID().toString
+
+    val batchMetadata = SessionMetadata(
+      identifier = batchId1,
+      sessionType = SessionType.BATCH,
+      realUser = "kyuubi",
+      username = "kyuubi",
+      ipAddress = "localhost",
+      kyuubiInstance = kyuubiInstance,
+      state = OperationState.PENDING.toString,
+      resource = sparkProcessBuilder.mainResource.get,
+      className = sparkProcessBuilder.mainClass,
+      requestName = "PENDING_RECOVERY",
+      requestConf = Map(
+        "spark.master" -> "local",
+        s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "3000",
+        s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+      requestArgs = Seq.empty,
+      createTime = System.currentTimeMillis(),
+      engineType = "SPARK")
+
+    val batchMetadata2 = batchMetadata.copy(
+      identifier = batchId2,
+      requestName = "RUNNING_RECOVERY")
+    sessionManager.insertMetadata(batchMetadata)
+    sessionManager.insertMetadata(batchMetadata2)
+
+    assert(sessionManager.getBatchFromStateStore(batchId1).getState.equals("PENDING"))
+    assert(sessionManager.getBatchFromStateStore(batchId2).getState.equals("PENDING"))
+
+    val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
+      "kyuubi",
+      conf,
+      batchId2,
+      "RUNNING_RECOVERY",
+      sparkProcessBuilder.mainResource,
+      sparkProcessBuilder.mainClass,
+      batchMetadata2.requestConf,
+      batchMetadata2.requestArgs,
+      None)
+    sparkBatchProcessBuilder.start
+
+    var applicationStatus: Option[Map[String, String]] = None
+    eventually(timeout(5.seconds)) {
+      applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2)
+      assert(applicationStatus.isDefined)
+    }
+
+    sessionManager.updateBatchMetadata(
+      batchId2,
+      OperationState.RUNNING,
+      applicationStatus.get)
+
+    val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
+    restFe.recoverBatchSessions()
+    assert(sessionManager.getOpenSessionCount == 2)
+
+    val sessionHandle1 =
+      sessionManager.getBatchSessionHandle(batchId1, BatchesResource.REST_BATCH_PROTOCOL)
+    val sessionHandle2 =
+      sessionManager.getBatchSessionHandle(batchId2, BatchesResource.REST_BATCH_PROTOCOL)
+    val session1 = sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSessionImpl]
+    val session2 = sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSessionImpl]
+    assert(session1.createTime === batchMetadata.createTime)
+    assert(session2.createTime === batchMetadata2.createTime)
+
+    eventually(timeout(5.seconds)) {
+      assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+      assert(session1.batchJobSubmissionOp.builder.processLaunched)
+
+      assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+      assert(!session2.batchJobSubmissionOp.builder.processLaunched)
+    }
+
+    assert(sessionManager.getBatchesFromStateStore(
+      "SPARK",
+      null,
+      null,
+      0,
+      0,
+      0,
+      Int.MaxValue).size == 2)
+  }
 }