You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/08 02:32:01 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new e72572515 [KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
e72572515 is described below
commit e725725155dc56460518e2c0e5df5ffa562aa5f8
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Jun 8 10:31:50 2022 +0800
[KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
### _Why are the changes needed?_
To close #2373
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2790 from turboFei/recovery_batch.
Closes #2373
81632191 [Fei Wang] trigger test
6ac6f9bd [Fei Wang] async recovery
6d9edf4e [Fei Wang] comments
daa6719f [Fei Wang] refactor
0c3a2e14 [Fei Wang] remove waitAppCompletion
04e139ee [Fei Wang] comments
b5d11d8f [Fei Wang] comment for method name
321dfa95 [Fei Wang] refactor
90be2df7 [Fei Wang] address comments
14409582 [Fei Wang] batch recovery
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
docs/deployment/settings.md | 2 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 14 +++
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 7 +-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 4 +
.../kyuubi/operation/BatchJobSubmission.scala | 69 +++++++++++++-
.../kyuubi/operation/KyuubiOperationManager.scala | 7 +-
.../kyuubi/server/KyuubiRestFrontendService.scala | 50 +++++++++-
.../server/statestore/SessionStateStore.scala | 18 ++++
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 52 ++++++-----
.../kyuubi/session/KyuubiSessionManager.scala | 78 ++++++++++++++--
.../server/api/v1/BatchesResourceSuite.scala | 101 ++++++++++++++++++++-
11 files changed, 357 insertions(+), 45 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 78dc2f9fa..3650caa5d 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -360,6 +360,8 @@ kyuubi.server.state.store.jdbc.password||The password for server jdbc state stor
kyuubi.server.state.store.jdbc.url|jdbc:derby:memory:kyuubi_state_store_db;create=true|The jdbc url for server jdbc state store. By defaults, it is a DERBY in-memory database url, and the state information is not shared across kyuubi instances. To enable multiple kyuubi instances high available, please specify a production jdbc url.|string|1.6.0
kyuubi.server.state.store.jdbc.user||The username for server jdbc state store.|string|1.6.0
kyuubi.server.state.store.max.age|PT72H|The maximum age of state info in state store.|duration|1.6.0
+kyuubi.server.state.store.sessions.recovery.num.threads|10|The number of threads for sessions recovery from state store.|int|1.6.0
+kyuubi.server.state.store.sessions.recovery.per.batch|100|The number of sessions to recover from state store per batch.|int|1.6.0
### Session
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7bd3879fc..179cf848b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -884,6 +884,20 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMinutes(30).toMillis)
+ val SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH: ConfigEntry[Int] =
+ buildConf("kyuubi.server.state.store.sessions.recovery.per.batch")
+ .doc("The number of sessions to recover from state store per batch.")
+ .version("1.6.0")
+ .intConf
+ .createWithDefault(100)
+
+ val SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS: ConfigEntry[Int] =
+ buildConf("kyuubi.server.state.store.sessions.recovery.num.threads")
+ .doc("The number of threads for sessions recovery from state store.")
+ .version("1.6.0")
+ .intConf
+ .createWithDefault(10)
+
val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index c5a3944e6..e5552c914 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.util
-import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.Awaitable
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -53,6 +53,11 @@ object ThreadUtils extends Logging {
executor
}
+ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+ val threadFactory = new NamedThreadFactory(prefix, daemon = true)
+ Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
+
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 8734b991b..9f0aae4b8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths}
import scala.collection.JavaConverters._
+import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
@@ -152,6 +153,8 @@ trait ProcBuilder {
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
+ @VisibleForTesting
+ @volatile private[kyuubi] var processLaunched: Boolean = _
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -191,6 +194,7 @@ trait ProcBuilder {
final def start: Process = synchronized {
process = processBuilder.start()
+ processLaunched = true
val reader = Files.newBufferedReader(engineLog.toPath, StandardCharsets.UTF_8)
val redirect: Runnable = { () =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3cb9a3ce4..308ef0be2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,17 +25,20 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import com.codahale.metrics.MetricRegistry
+import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
+import org.apache.kyuubi.engine.ApplicationOperation._
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
import org.apache.kyuubi.util.ThriftUtils
@@ -58,7 +61,8 @@ class BatchJobSubmission(
resource: String,
className: String,
batchConf: Map[String, String],
- batchArgs: Seq[String])
+ batchArgs: Seq[String],
+ recoveryMetadata: Option[SessionMetadata])
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
override def statement: String = "BATCH_JOB_SUBMISSION"
@@ -76,7 +80,8 @@ class BatchJobSubmission(
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
- private val builder: ProcBuilder = {
+ @VisibleForTesting
+ private[kyuubi] val builder: ProcBuilder = {
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
case Some("SPARK") =>
new SparkBatchProcessBuilder(
@@ -143,7 +148,22 @@ class BatchJobSubmission(
val asyncOperation: Runnable = () => {
setStateIfNotCanceled(OperationState.RUNNING)
try {
- submitBatchJob()
+ // If it is in recovery mode, only re-submit batch job if previous state is PENDING and
+ // fail to fetch the status including appId from resource manager. Otherwise, monitor the
+ // submitted batch application.
+ recoveryMetadata.map { metadata =>
+ if (metadata.state == OperationState.PENDING.toString) {
+ applicationStatus = currentApplicationState
+ applicationStatus.map(_.get(APP_ID_KEY)).map {
+ case Some(appId) => monitorBatchJob(appId)
+ case None => submitAndMonitorBatchJob()
+ }
+ } else {
+ monitorBatchJob(metadata.engineId)
+ }
+ }.getOrElse {
+ submitAndMonitorBatchJob()
+ }
setStateIfNotCanceled(OperationState.FINISHED)
} catch {
onError()
@@ -169,10 +189,15 @@ class BatchJobSubmission(
s.contains("KILLED") || s.contains("FAILED"))
}
- private def submitBatchJob(): Unit = {
+ private def applicationTerminated(applicationStatus: Option[Map[String, String]]): Boolean = {
+ applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
+ s.contains("KILLED") || s.contains("FAILED") || s.contains("FINISHED"))
+ }
+
+ private def submitAndMonitorBatchJob(): Unit = {
var appStatusFirstUpdated = false
try {
- info(s"Submitting $batchType batch job: $builder")
+ info(s"Submitting $batchType batch[$batchId] job: $builder")
val process = builder.start
applicationStatus = currentApplicationState
while (!applicationFailed(applicationStatus) && process.isAlive) {
@@ -192,12 +217,46 @@ class BatchJobSubmission(
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
+
+ applicationStatus.map(_.get(APP_ID_KEY)).map {
+ case Some(appId) => monitorBatchJob(appId)
+ case _ =>
+ }
}
} finally {
builder.close()
}
}
+ private def monitorBatchJob(appId: String): Unit = {
+ info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
+ if (applicationStatus.isEmpty) {
+ applicationStatus = currentApplicationState
+ }
+ if (applicationStatus.isEmpty) {
+ info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
+ } else if (applicationFailed(applicationStatus)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
+ applicationStatus.get.mkString(","))
+ } else {
+ // TODO: add limit for max batch job submission lifetime
+ while (applicationStatus.isDefined && !applicationTerminated(applicationStatus)) {
+ Thread.sleep(applicationCheckInterval)
+ val newApplicationStatus = currentApplicationState
+ if (newApplicationStatus != applicationStatus) {
+ applicationStatus = newApplicationStatus
+ info(s"Batch report for $batchId" +
+ applicationStatus.map(_.mkString("(", ",", ")")).getOrElse("()"))
+ }
+ }
+
+ if (applicationFailed(applicationStatus)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
+ applicationStatus.get.mkString(","))
+ }
+ }
+ }
+
def getOperationLogRowSet(
order: FetchOrientation,
from: Int,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index d542d0995..00a78e2f8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -26,6 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
import org.apache.kyuubi.util.ThriftUtils
@@ -69,7 +70,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
resource: String,
className: String,
batchConf: Map[String, String],
- batchArgs: Seq[String]): BatchJobSubmission = {
+ batchArgs: Seq[String],
+ recoveryMetadata: Option[SessionMetadata]): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
@@ -77,7 +79,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
resource,
className,
batchConf,
- batchArgs)
+ batchArgs,
+ recoveryMetadata)
addOperation(operation)
operation
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 3eb3ec63e..52dc93238 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -18,20 +18,24 @@
package org.apache.kyuubi.server
import java.util.EnumSet
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import javax.servlet.DispatcherType
+import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.servlet.FilterHolder
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
+import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS}
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.JettyServer
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
+import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.util.ThreadUtils
/**
* A frontend service based on RESTful api via HTTP protocol.
@@ -46,6 +50,8 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
private def hadoopConf: Configuration = KyuubiServer.getHadoopConf()
+ private def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager]
+
override def initialize(conf: KyuubiConf): Unit = synchronized {
val host = conf.get(FRONTEND_REST_BIND_HOST)
.getOrElse(Utils.findLocalInetAddress.getHostAddress)
@@ -71,10 +77,50 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
server.addRedirectHandler("/docs", "/swagger")
}
+ @VisibleForTesting
+ private[kyuubi] def recoverBatchSessions(): Unit = {
+ val recoveryNumThreads = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS)
+ val batchRecoveryExecutor =
+ ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, "batch-recovery-executor")
+ try {
+ val batchSessionsToRecover = sessionManager.getBatchSessionsToRecover(connectionUrl)
+ val pendingRecoveryTasksCount = new AtomicInteger(0)
+ val tasks = batchSessionsToRecover.flatMap { batchSession =>
+ val batchId = batchSession.batchJobSubmissionOp.batchId
+ try {
+ val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
+ Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession)))
+ Some(task -> batchId)
+ } catch {
+ case e: Throwable =>
+ error(s"Error while submitting batch[$batchId] for recovery", e)
+ None
+ }
+ }
+
+ pendingRecoveryTasksCount.addAndGet(tasks.size)
+
+ tasks.foreach { case (task, batchId) =>
+ try {
+ task.get()
+ } catch {
+ case e: Throwable =>
+ error(s"Error while recovering batch[$batchId]", e)
+ } finally {
+ val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+ info(s"Batch[$batchId] recovery task terminated, current pending tasks $pendingTasks")
+ }
+ }
+ } finally {
+ ThreadUtils.shutdown(batchRecoveryExecutor)
+ }
+ }
+
override def start(): Unit = synchronized {
if (!isStarted.get) {
try {
server.start()
+ recoverBatchSessions()
isStarted.set(true)
info(s"$getName has started at ${server.getServerUri}")
startInternal()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
index 0ef14b00c..1f60ce357 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
@@ -83,6 +83,24 @@ class SessionStateStore extends AbstractService("SessionStateStore") {
true).map(buildBatch)
}
+ def getBatchesRecoveryMetadata(
+ state: String,
+ kyuubiInstance: String,
+ from: Int,
+ size: Int): Seq[SessionMetadata] = {
+ _stateStore.getMetadataList(
+ SessionType.BATCH,
+ null,
+ null,
+ state,
+ kyuubiInstance,
+ 0,
+ 0,
+ from,
+ size,
+ false)
+ }
+
def updateBatchMetadata(
batchId: String,
state: String,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 9ffc8179b..85bfb32cb 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -40,11 +40,16 @@ class KyuubiBatchSessionImpl(
conf: Map[String, String],
override val sessionManager: KyuubiSessionManager,
val sessionConf: KyuubiConf,
- batchRequest: BatchRequest)
+ batchRequest: BatchRequest,
+ recoveryMetadata: Option[SessionMetadata] = None)
extends KyuubiSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val sessionType: SessionType = SessionType.BATCH
- override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
+ override val handle: SessionHandle = recoveryMetadata.map { metadata =>
+ sessionManager.getBatchSessionHandle(metadata.identifier, protocol)
+ }.getOrElse(sessionManager.newBatchSessionHandle(protocol))
+
+ override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
// TODO: Support batch conf advisor
override val normalizedConf: Map[String, String] = {
@@ -60,7 +65,8 @@ class KyuubiBatchSessionImpl(
batchRequest.getResource,
batchRequest.getClassName,
normalizedConf,
- batchRequest.getArgs.asScala)
+ batchRequest.getArgs.asScala,
+ recoveryMetadata)
private val sessionEvent = KyuubiSessionEvent(this)
EventBus.post(sessionEvent)
@@ -75,25 +81,27 @@ class KyuubiBatchSessionImpl(
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
}
- val metaData = SessionMetadata(
- identifier = handle.identifier.toString,
- sessionType = sessionType,
- // TODO: support real user
- realUser = user,
- username = user,
- ipAddress = ipAddress,
- // TODO: support to transfer fe connection url when opening session
- kyuubiInstance = KyuubiRestFrontendService.getConnectionUrl,
- state = OperationState.PENDING.toString,
- resource = batchRequest.getResource,
- className = batchRequest.getClassName,
- requestName = batchRequest.getName,
- requestConf = normalizedConf,
- requestArgs = batchRequest.getArgs.asScala,
- createTime = createTime,
- engineType = batchRequest.getBatchType)
-
- sessionManager.insertMetadata(metaData)
+ if (recoveryMetadata.isEmpty) {
+ val metaData = SessionMetadata(
+ identifier = handle.identifier.toString,
+ sessionType = sessionType,
+ // TODO: support real user
+ realUser = user,
+ username = user,
+ ipAddress = ipAddress,
+ // TODO: support to transfer fe connection url when opening session
+ kyuubiInstance = KyuubiRestFrontendService.getConnectionUrl,
+ state = OperationState.PENDING.toString,
+ resource = batchRequest.getResource,
+ className = batchRequest.getClassName,
+ requestName = batchRequest.getName,
+ requestConf = normalizedConf,
+ requestArgs = batchRequest.getArgs.asScala,
+ createTime = createTime,
+ engineType = batchRequest.getBatchType)
+
+ sessionManager.insertMetadata(metaData)
+ }
// we should call super.open before running batch job submission operation
super.open()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 957233e69..67770ed7e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -19,6 +19,9 @@ package org.apache.kyuubi.session
import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -32,9 +35,10 @@ import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
-import org.apache.kyuubi.operation.KyuubiOperationManager
+import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}
+import org.apache.kyuubi.server.api.v1.BatchesResource
import org.apache.kyuubi.server.statestore.SessionStateStore
import org.apache.kyuubi.server.statestore.api.SessionMetadata
@@ -109,23 +113,30 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
}
}
- def openBatchSession(
+ private def createBatchSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String],
- batchRequest: BatchRequest): SessionHandle = {
+ batchRequest: BatchRequest,
+ recoveryMetadata: Option[SessionMetadata] = None): KyuubiBatchSessionImpl = {
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
- val batchSession = new KyuubiBatchSessionImpl(
+ new KyuubiBatchSessionImpl(
protocol,
- user,
+ username,
password,
ipAddress,
conf,
this,
this.getConf.getUserDefaults(user),
- batchRequest)
+ batchRequest,
+ recoveryMetadata)
+ }
+
+ private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): SessionHandle = {
+ val user = batchSession.user
+ val ipAddress = batchSession.ipAddress
try {
val handle = batchSession.handle
batchSession.open()
@@ -146,11 +157,22 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
}
throw KyuubiSQLException(
- s"Error opening batch session for $username client ip $ipAddress, due to ${e.getMessage}",
+ s"Error opening batch session for $user client ip $ipAddress, due to ${e.getMessage}",
e)
}
}
+ def openBatchSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ batchRequest: BatchRequest): SessionHandle = {
+ val batchSession = createBatchSession(protocol, user, password, ipAddress, conf, batchRequest)
+ openBatchSession(batchSession)
+ }
+
def newBatchSessionHandle(protocol: TProtocolVersion): SessionHandle = {
SessionHandle(HandleIdentifier(UUID.randomUUID(), STATIC_BATCH_SECRET_UUID), protocol)
}
@@ -201,10 +223,50 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
}
- // TODO: support to recover batch sessions with session state store
super.start()
}
+ def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSessionImpl] = {
+ val recoveryPerBatch = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH)
+
+ val batchSessionsToRecover = ListBuffer[KyuubiBatchSessionImpl]()
+ Seq(OperationState.PENDING, OperationState.RUNNING).foreach { stateToRecover =>
+ var offset = 0
+ var lastRecoveryNum = Int.MaxValue
+
+ while (lastRecoveryNum >= recoveryPerBatch) {
+ val metadataList = sessionStateStore.getBatchesRecoveryMetadata(
+ stateToRecover.toString,
+ kyuubiInstance,
+ offset,
+ recoveryPerBatch)
+ metadataList.foreach { metadata =>
+ val batchRequest = new BatchRequest(
+ metadata.engineType,
+ metadata.resource,
+ metadata.className,
+ metadata.requestName,
+ metadata.requestConf.asJava,
+ metadata.requestArgs.asJava)
+
+ val batchSession = createBatchSession(
+ BatchesResource.REST_BATCH_PROTOCOL,
+ metadata.username,
+ "anonymous",
+ metadata.ipAddress,
+ metadata.requestConf,
+ batchRequest,
+ Some(metadata))
+ batchSessionsToRecover += batchSession
+ }
+
+ lastRecoveryNum = metadataList.size
+ offset += lastRecoveryNum
+ }
+ }
+ batchSessionsToRecover
+ }
+
override protected def isServer: Boolean = true
private def initSessionLimiter(conf: KyuubiConf): Unit = {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index d32ee10d7..a56c3a4fd 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -32,12 +32,16 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
+import org.apache.kyuubi.server.statestore.api.SessionMetadata
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionType}
class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
+ private val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
override def afterEach(): Unit = {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
@@ -52,7 +56,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
test("open batch session") {
- val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
val appName = "spark-batch-submission"
val requestObj = new BatchRequest(
"spark",
@@ -322,8 +325,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
test("negative request") {
- val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
-
// open batch session
Seq(
(
@@ -367,4 +368,94 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(response.readEntity(classOf[String]).contains(msg))
}
}
+
+ test("batch sessions recovery") {
+ val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ val kyuubiInstance = fe.connectionUrl
+
+ assert(sessionManager.getOpenSessionCount == 0)
+ val batchId1 = UUID.randomUUID().toString
+ val batchId2 = UUID.randomUUID().toString
+
+ val batchMetadata = SessionMetadata(
+ identifier = batchId1,
+ sessionType = SessionType.BATCH,
+ realUser = "kyuubi",
+ username = "kyuubi",
+ ipAddress = "localhost",
+ kyuubiInstance = kyuubiInstance,
+ state = OperationState.PENDING.toString,
+ resource = sparkProcessBuilder.mainResource.get,
+ className = sparkProcessBuilder.mainClass,
+ requestName = "PENDING_RECOVERY",
+ requestConf = Map(
+ "spark.master" -> "local",
+ s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "3000",
+ s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+ requestArgs = Seq.empty,
+ createTime = System.currentTimeMillis(),
+ engineType = "SPARK")
+
+ val batchMetadata2 = batchMetadata.copy(
+ identifier = batchId2,
+ requestName = "RUNNING_RECOVERY")
+ sessionManager.insertMetadata(batchMetadata)
+ sessionManager.insertMetadata(batchMetadata2)
+
+ assert(sessionManager.getBatchFromStateStore(batchId1).getState.equals("PENDING"))
+ assert(sessionManager.getBatchFromStateStore(batchId2).getState.equals("PENDING"))
+
+ val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
+ "kyuubi",
+ conf,
+ batchId2,
+ "RUNNING_RECOVERY",
+ sparkProcessBuilder.mainResource,
+ sparkProcessBuilder.mainClass,
+ batchMetadata2.requestConf,
+ batchMetadata2.requestArgs,
+ None)
+ sparkBatchProcessBuilder.start
+
+ var applicationStatus: Option[Map[String, String]] = None
+ eventually(timeout(5.seconds)) {
+ applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2)
+ assert(applicationStatus.isDefined)
+ }
+
+ sessionManager.updateBatchMetadata(
+ batchId2,
+ OperationState.RUNNING,
+ applicationStatus.get)
+
+ val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
+ restFe.recoverBatchSessions()
+ assert(sessionManager.getOpenSessionCount == 2)
+
+ val sessionHandle1 =
+ sessionManager.getBatchSessionHandle(batchId1, BatchesResource.REST_BATCH_PROTOCOL)
+ val sessionHandle2 =
+ sessionManager.getBatchSessionHandle(batchId2, BatchesResource.REST_BATCH_PROTOCOL)
+ val session1 = sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSessionImpl]
+ val session2 = sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSessionImpl]
+ assert(session1.createTime === batchMetadata.createTime)
+ assert(session2.createTime === batchMetadata2.createTime)
+
+ eventually(timeout(5.seconds)) {
+ assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+ assert(session1.batchJobSubmissionOp.builder.processLaunched)
+
+ assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+ assert(!session2.batchJobSubmissionOp.builder.processLaunched)
+ }
+
+ assert(sessionManager.getBatchesFromStateStore(
+ "SPARK",
+ null,
+ null,
+ 0,
+ 0,
+ 0,
+ Int.MaxValue).size == 2)
+ }
}