You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/28 13:14:50 UTC

[kyuubi] branch master updated: [KYUUBI #4418] Allow disable metadata operation async retry and fail fast on unrecoverable DB error

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 835454de6 [KYUUBI #4418] Allow disable metadata operation async retry and fail fast on unrecoverable DB error
835454de6 is described below

commit 835454de630c3b791623287d96829870b8052cc2
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Feb 28 21:14:36 2023 +0800

    [KYUUBI #4418] Allow disable metadata operation async retry and fail fast on unrecoverable DB error
    
    ### _Why are the changes needed?_
    
    The key changes are
    
    1. allow disabling metadata operation retry
    2. always fail fast on "duplicated key on unique index" error
    
    Currently, when metadata operations failed, we always do async retry, to tolerate long-time metadata store outages w/o blocking the submission request, but it can not guarantee metadata consistency eventually, e.g. when inserting data violates the unique key restriction, it will never succeed, and block any following update request for the batch job, in such cases, the client gets succeed response but the metadata can not be updated correctly.
    
    We should distinguish between recoverable and unrecoverable errors, for unrecoverable errors, we should fail fast, but the fact is it's hard to enumerate all recoverable nor unrecoverable errors, in this PR, we just enumerate the "duplicated key" as unrecoverable errors, and provide a switch to disable async retry so that the error can propagate to client correctly.
    
    Some configurations are renamed w/ the `async.` prefix(the original key still takes effect) because we may introduce the sync retry logic in the future.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4418 from pan3793/sync-retry.
    
    Closes #4418
    
    ce58ac58c [Cheng Pan] revert migration-guide.md
    c2d8377a4 [Cheng Pan] simplify
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 docs/deployment/settings.md                        |   5 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  37 ++--
 .../kyuubi/server/metadata/MetadataManager.scala   | 139 ++++++------
 .../kyuubi/server/metadata/MetadataRequest.scala   |   2 +-
 .../server/metadata/MetadataManagerSuite.scala     | 241 +++++++++++----------
 5 files changed, 234 insertions(+), 190 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 1b593bde1..cd8f5b770 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -302,9 +302,10 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.metadata.cleaner.interval                | PT30M                                                    | The interval to check and clean expired metadata.                                                                                                                                                                                                                                                                                                                                             [...]
 | kyuubi.metadata.max.age                         | PT72H                                                    | The maximum age of metadata, the metadata exceeding the age will be cleaned.                                                                                                                                                                                                                                                                                                                  [...]
 | kyuubi.metadata.recovery.threads                | 10                                                       | The number of threads for recovery from the metadata store when the Kyuubi server restarts.                                                                                                                                                                                                                                                                                                   [...]
+| kyuubi.metadata.request.async.retry.enabled     | true                                                     | Whether to retry in async when metadata request failed. When true, return success response immediately even the metadata request failed, and schedule it in background until success, to tolerate long-time metadata store outages w/o blocking the submission request.                                                                                                                       [...]
+| kyuubi.metadata.request.async.retry.queue.size  | 65536                                                    | The maximum queue size for buffering metadata requests in memory when the external metadata storage is down. Requests will be dropped if the queue exceeds. Only take affect when kyuubi.metadata.request.async.retry.enabled is `true`.                                                                                                                                                      [...]
+| kyuubi.metadata.request.async.retry.threads     | 10                                                       | Number of threads in the metadata request async retry manager thread pool. Only take affect when kyuubi.metadata.request.async.retry.enabled is `true`.                                                                                                                                                                                                                                       [...]
 | kyuubi.metadata.request.retry.interval          | PT5S                                                     | The interval to check and trigger the metadata request retry tasks.                                                                                                                                                                                                                                                                                                                           [...]
-| kyuubi.metadata.request.retry.queue.size        | 65536                                                    | The maximum queue size for buffering metadata requests in memory when the external metadata storage is down. Requests will be dropped if the queue exceeds.                                                                                                                                                                                                                                   [...]
-| kyuubi.metadata.request.retry.threads           | 10                                                       | Number of threads in the metadata request retry manager thread pool. The metadata store might be unavailable sometimes and the requests will fail, tolerant for this case and unblock the main thread, we support retrying the failed requests in an async way.                                                                                                                               [...]
 | kyuubi.metadata.store.class                     | org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStore | Fully qualified class name for server metadata store.                                                                                                                                                                                                                                                                                                                                         [...]
 | kyuubi.metadata.store.jdbc.database.schema.init | true                                                     | Whether to init the JDBC metadata store database schema.                                                                                                                                                                                                                                                                                                                                      [...]
 | kyuubi.metadata.store.jdbc.database.type        | DERBY                                                    | The database type for server jdbc metadata store.<ul> <li>DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to specify corresponding JDBC driver.</li> Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specif [...]
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index f59cb2d98..f61cfeaa7 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1568,16 +1568,6 @@ object KyuubiConf {
       .intConf
       .createWithDefault(10)
 
-  val METADATA_REQUEST_RETRY_THREADS: ConfigEntry[Int] =
-    buildConf("kyuubi.metadata.request.retry.threads")
-      .doc("Number of threads in the metadata request retry manager thread pool. The metadata" +
-        " store might be unavailable sometimes and the requests will fail, tolerant for this" +
-        " case and unblock the main thread, we support retrying the failed requests" +
-        " in an async way.")
-      .version("1.6.0")
-      .intConf
-      .createWithDefault(10)
-
   val METADATA_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
     buildConf("kyuubi.metadata.request.retry.interval")
       .doc("The interval to check and trigger the metadata request retry tasks.")
@@ -1585,10 +1575,31 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(Duration.ofSeconds(5).toMillis)
 
-  val METADATA_REQUEST_RETRY_QUEUE_SIZE: ConfigEntry[Int] =
-    buildConf("kyuubi.metadata.request.retry.queue.size")
+  val METADATA_REQUEST_ASYNC_RETRY_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.metadata.request.async.retry.enabled")
+      .doc("Whether to retry in async when metadata request failed. When true, return " +
+        "success response immediately even the metadata request failed, and schedule " +
+        "it in background until success, to tolerate long-time metadata store outages " +
+        "w/o blocking the submission request.")
+      .version("1.7.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val METADATA_REQUEST_ASYNC_RETRY_THREADS: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.request.async.retry.threads")
+      .withAlternative("kyuubi.metadata.request.retry.threads")
+      .doc("Number of threads in the metadata request async retry manager thread pool. Only " +
+        s"take affect when ${METADATA_REQUEST_ASYNC_RETRY_ENABLED.key} is `true`.")
+      .version("1.6.0")
+      .intConf
+      .createWithDefault(10)
+
+  val METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.request.async.retry.queue.size")
+      .withAlternative("kyuubi.metadata.request.retry.queue.size")
       .doc("The maximum queue size for buffering metadata requests in memory when the external" +
-        " metadata storage is down. Requests will be dropped if the queue exceeds.")
+        " metadata storage is down. Requests will be dropped if the queue exceeds. Only" +
+        s" take affect when ${METADATA_REQUEST_ASYNC_RETRY_ENABLED.key} is `true`.")
       .version("1.6.0")
       .intConf
       .createWithDefault(65536)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 5cecd2ab1..35abc1b30 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -37,44 +37,55 @@ class MetadataManager extends AbstractService("MetadataManager") {
   private var _metadataStore: MetadataStore = _
 
   // Visible for testing.
-  private[metadata] val identifierRequestsRetryRefs =
+  private[metadata] val identifierRequestsAsyncRetryRefs =
     new ConcurrentHashMap[String, MetadataRequestsRetryRef]()
 
   // Visible for testing.
-  private[metadata] val identifierRequestsRetryingCounts =
+  private[metadata] val identifierRequestsAsyncRetryingCounts =
     new ConcurrentHashMap[String, AtomicInteger]()
 
-  private val requestsRetryTrigger =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("metadata-requests-retry-trigger")
+  private lazy val requestsRetryInterval =
+    conf.get(KyuubiConf.METADATA_REQUEST_RETRY_INTERVAL)
 
-  private var requestsRetryExecutor: ThreadPoolExecutor = _
+  private lazy val requestsAsyncRetryEnabled =
+    conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_ENABLED)
 
-  private var maxMetadataRequestsRetryRefs: Int = _
+  private lazy val requestsAsyncRetryTrigger =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("metadata-requests-async-retry-trigger")
 
-  private val metadataCleaner =
+  private lazy val requestsAsyncRetryExecutor: ThreadPoolExecutor =
+    ThreadUtils.newDaemonFixedThreadPool(
+      conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_THREADS),
+      "metadata-requests-async-retry")
+
+  private lazy val cleanerEnabled = conf.get(KyuubiConf.METADATA_CLEANER_ENABLED)
+
+  private lazy val metadataCleaner =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("metadata-cleaner")
 
   override def initialize(conf: KyuubiConf): Unit = {
     _metadataStore = MetadataManager.createMetadataStore(conf)
-    val retryExecutorNumThreads =
-      conf.get(KyuubiConf.METADATA_REQUEST_RETRY_THREADS)
-    requestsRetryExecutor = ThreadUtils.newDaemonFixedThreadPool(
-      retryExecutorNumThreads,
-      "metadata-requests-retry-executor")
-    maxMetadataRequestsRetryRefs = conf.get(KyuubiConf.METADATA_REQUEST_RETRY_QUEUE_SIZE)
     super.initialize(conf)
   }
 
   override def start(): Unit = {
     super.start()
-    startMetadataRequestsRetryTrigger()
-    startMetadataCleaner()
+    if (requestsAsyncRetryEnabled) {
+      startMetadataRequestsAsyncRetryTrigger()
+    }
+    if (cleanerEnabled) {
+      startMetadataCleaner()
+    }
   }
 
   override def stop(): Unit = {
-    ThreadUtils.shutdown(requestsRetryTrigger)
-    ThreadUtils.shutdown(requestsRetryExecutor)
-    ThreadUtils.shutdown(metadataCleaner)
+    if (requestsAsyncRetryEnabled) {
+      ThreadUtils.shutdown(requestsAsyncRetryTrigger)
+      ThreadUtils.shutdown(requestsAsyncRetryExecutor)
+    }
+    if (cleanerEnabled) {
+      ThreadUtils.shutdown(metadataCleaner)
+    }
     _metadataStore.close()
     super.stop()
   }
@@ -93,11 +104,22 @@ class MetadataManager extends AbstractService("MetadataManager") {
     }
   }
 
-  def insertMetadata(metadata: Metadata, retryOnError: Boolean = true): Unit = {
+  protected def unrecoverableDBErr(cause: Throwable): Boolean = {
+    val unrecoverableKeywords = Seq(
+      "duplicate key value in a unique or primary key constraint or unique index", // Derby
+      "Duplicate entry" // MySQL
+    )
+    unrecoverableKeywords.exists(cause.getMessage.contains)
+  }
+
+  def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
     try {
       withMetadataRequestMetrics(_metadataStore.insertMetadata(metadata))
     } catch {
-      case e: Throwable if retryOnError =>
+      // stop to retry when encounter duplicated key error.
+      case rethrow: Throwable if unrecoverableDBErr(rethrow) =>
+        throw rethrow
+      case e: Throwable if requestsAsyncRetryEnabled && asyncRetryOnError =>
         error(s"Error inserting metadata for session ${metadata.identifier}", e)
         addMetadataRetryRequest(InsertMetadata(metadata))
     }
@@ -156,11 +178,11 @@ class MetadataManager extends AbstractService("MetadataManager") {
     withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, true))
   }
 
-  def updateMetadata(metadata: Metadata, retryOnError: Boolean = true): Unit = {
+  def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
     try {
       withMetadataRequestMetrics(_metadataStore.updateMetadata(metadata))
     } catch {
-      case e: Throwable if retryOnError =>
+      case e: Throwable if requestsAsyncRetryEnabled && asyncRetryOnError =>
         error(s"Error updating metadata for session ${metadata.identifier}", e)
         addMetadataRetryRequest(UpdateMetadata(metadata))
     }
@@ -171,35 +193,33 @@ class MetadataManager extends AbstractService("MetadataManager") {
   }
 
   private def startMetadataCleaner(): Unit = {
-    val cleanerEnabled = conf.get(KyuubiConf.METADATA_CLEANER_ENABLED)
     val stateMaxAge = conf.get(METADATA_MAX_AGE)
-
-    if (cleanerEnabled) {
-      val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL)
-      val cleanerTask: Runnable = () => {
-        try {
-          withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge))
-        } catch {
-          case e: Throwable => error("Error cleaning up the metadata by age", e)
-        }
+    val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL)
+    val cleanerTask: Runnable = () => {
+      try {
+        withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge))
+      } catch {
+        case e: Throwable => error("Error cleaning up the metadata by age", e)
       }
-
-      metadataCleaner.scheduleWithFixedDelay(
-        cleanerTask,
-        interval,
-        interval,
-        TimeUnit.MILLISECONDS)
     }
+
+    metadataCleaner.scheduleWithFixedDelay(
+      cleanerTask,
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
   }
 
   def addMetadataRetryRequest(request: MetadataRequest): Unit = {
-    if (identifierRequestsRetryRefs.size() > maxMetadataRequestsRetryRefs) {
+    val maxRequestsAsyncRetryRefs: Int =
+      conf.get(KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE)
+    if (identifierRequestsAsyncRetryRefs.size() > maxRequestsAsyncRetryRefs) {
       throw new KyuubiException(
         "The number of metadata requests retry instances exceeds the limitation:" +
-          maxMetadataRequestsRetryRefs)
+          maxRequestsAsyncRetryRefs)
     }
     val identifier = request.metadata.identifier
-    val ref = identifierRequestsRetryRefs.computeIfAbsent(
+    val ref = identifierRequestsAsyncRetryRefs.computeIfAbsent(
       identifier,
       identifier => {
         val ref = new MetadataRequestsRetryRef
@@ -207,30 +227,29 @@ class MetadataManager extends AbstractService("MetadataManager") {
         ref
       })
     ref.addRetryingMetadataRequest(request)
-    identifierRequestsRetryRefs.putIfAbsent(identifier, ref)
+    identifierRequestsAsyncRetryRefs.putIfAbsent(identifier, ref)
     MetricsSystem.tracing(_.markMeter(MetricsConstants.METADATA_REQUEST_RETRYING))
   }
 
   def getMetadataRequestsRetryRef(identifier: String): MetadataRequestsRetryRef = {
-    identifierRequestsRetryRefs.get(identifier)
+    identifierRequestsAsyncRetryRefs.get(identifier)
   }
 
   def deRegisterRequestsRetryRef(identifier: String): Unit = {
-    identifierRequestsRetryRefs.remove(identifier)
-    identifierRequestsRetryingCounts.remove(identifier)
+    identifierRequestsAsyncRetryRefs.remove(identifier)
+    identifierRequestsAsyncRetryingCounts.remove(identifier)
   }
 
-  private def startMetadataRequestsRetryTrigger(): Unit = {
-    val interval = conf.get(KyuubiConf.METADATA_REQUEST_RETRY_INTERVAL)
+  private def startMetadataRequestsAsyncRetryTrigger(): Unit = {
     val triggerTask = new Runnable {
       override def run(): Unit = {
-        identifierRequestsRetryRefs.forEach { (id, ref) =>
+        identifierRequestsAsyncRetryRefs.forEach { (id, ref) =>
           if (!ref.hasRemainingRequests()) {
-            identifierRequestsRetryRefs.remove(id)
-            identifierRequestsRetryingCounts.remove(id)
+            identifierRequestsAsyncRetryRefs.remove(id)
+            identifierRequestsAsyncRetryingCounts.remove(id)
           } else {
-            val retryingCount =
-              identifierRequestsRetryingCounts.computeIfAbsent(id, _ => new AtomicInteger(0))
+            val retryingCount = identifierRequestsAsyncRetryingCounts
+              .computeIfAbsent(id, _ => new AtomicInteger(0))
 
             if (retryingCount.get() == 0) {
               val retryTask = new Runnable {
@@ -241,12 +260,9 @@ class MetadataManager extends AbstractService("MetadataManager") {
                     while (request != null) {
                       request match {
                         case insert: InsertMetadata =>
-                          insertMetadata(insert.metadata, retryOnError = false)
-
+                          insertMetadata(insert.metadata, asyncRetryOnError = false)
                         case update: UpdateMetadata =>
-                          updateMetadata(update.metadata, retryOnError = false)
-
-                        case _ =>
+                          updateMetadata(update.metadata, asyncRetryOnError = false)
                       }
                       ref.metadataRequests.remove(request)
                       MetricsSystem.tracing(_.markMeter(
@@ -265,22 +281,21 @@ class MetadataManager extends AbstractService("MetadataManager") {
 
               try {
                 retryingCount.incrementAndGet()
-                requestsRetryExecutor.submit(retryTask)
+                requestsAsyncRetryExecutor.submit(retryTask)
               } catch {
                 case e: Throwable =>
                   error(s"Error submitting metadata retry requests for $id", e)
                   retryingCount.decrementAndGet()
               }
             }
-
           }
         }
       }
     }
-    requestsRetryTrigger.scheduleWithFixedDelay(
+    requestsAsyncRetryTrigger.scheduleWithFixedDelay(
       triggerTask,
-      interval,
-      interval,
+      requestsRetryInterval,
+      requestsRetryInterval,
       TimeUnit.MILLISECONDS)
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
index dcee6466b..2c121edfe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.server.metadata
 
 import org.apache.kyuubi.server.metadata.api.Metadata
 
-trait MetadataRequest {
+sealed trait MetadataRequest {
   def metadata: Metadata
 }
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index d8a8af202..75c935a3d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -25,103 +25,152 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType
 
 class MetadataManagerSuite extends KyuubiFunSuite {
-  val metadataManager = new MetadataManager()
-  val metricsSystem = new MetricsSystem()
-  val conf = KyuubiConf().set(KyuubiConf.METADATA_REQUEST_RETRY_INTERVAL, 100L)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    metricsSystem.initialize(conf)
-    metricsSystem.start()
-    metadataManager.initialize(conf)
-    metadataManager.start()
-  }
 
-  override def afterAll(): Unit = {
-    metadataManager.getBatches(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch =>
-      metadataManager.cleanupMetadataById(batch.getId)
+  test("fail fast on duplicated key") {
+    Seq("true", "false").foreach { enableAsyncRetry =>
+      withMetadataManager(Map(
+        METADATA_REQUEST_ASYNC_RETRY_ENABLED.key -> enableAsyncRetry,
+        METADATA_REQUEST_RETRY_INTERVAL.key -> "100")) { metadataManager =>
+        val metadata = newMetadata()
+        metadataManager.insertMetadata(metadata)
+        Seq(true, false).foreach { asyncRetryOnError =>
+          intercept[KyuubiException] {
+            metadataManager.insertMetadata(metadata, asyncRetryOnError)
+          }
+        }
+      }
     }
-    metadataManager.stop()
-    metricsSystem.stop()
-    super.afterAll()
   }
 
-  override protected def afterEach(): Unit = {
-    eventually(timeout(5.seconds), interval(200.milliseconds)) {
-      assert(MetricsSystem.counterValue(
-        MetricsConstants.METADATA_REQUEST_OPENED).getOrElse(0L) === 0)
+  test("async retry the metadata store requests") {
+    withMetadataManager(
+      Map(
+        METADATA_REQUEST_ASYNC_RETRY_ENABLED.key -> "true",
+        METADATA_REQUEST_RETRY_INTERVAL.key -> "100"),
+      () =>
+        new MetadataManager {
+          override protected def unrecoverableDBErr(cause: Throwable): Boolean = false
+        }) { metadataManager =>
+      val metadata = newMetadata()
+      metadataManager.insertMetadata(metadata)
+      metadataManager.insertMetadata(metadata, asyncRetryOnError = true)
+      val retryRef = metadataManager.getMetadataRequestsRetryRef(metadata.identifier)
+      val metadataToUpdate = metadata.copy(state = "RUNNING")
+      retryRef.addRetryingMetadataRequest(UpdateMetadata(metadataToUpdate))
+      eventually(timeout(3.seconds)) {
+        assert(retryRef.hasRemainingRequests())
+        assert(metadataManager.getBatch(metadata.identifier).getState === "PENDING")
+      }
+
+      val metadata2 = metadata.copy(identifier = UUID.randomUUID().toString)
+      val metadata2ToUpdate = metadata2.copy(
+        engineId = "app_id",
+        engineName = "app_name",
+        engineUrl = "app_url",
+        engineState = "app_state",
+        state = "RUNNING")
+
+      metadataManager.addMetadataRetryRequest(InsertMetadata(metadata2))
+      metadataManager.addMetadataRetryRequest(UpdateMetadata(metadata2ToUpdate))
+
+      val retryRef2 = metadataManager.getMetadataRequestsRetryRef(metadata2.identifier)
+
+      eventually(timeout(3.seconds)) {
+        assert(!retryRef2.hasRemainingRequests())
+        assert(metadataManager.getBatch(metadata2.identifier).getState === "RUNNING")
+      }
+
+      metadataManager.identifierRequestsAsyncRetryRefs.clear()
+      eventually(timeout(3.seconds)) {
+        metadataManager.identifierRequestsAsyncRetryingCounts.asScala.forall(_._2.get() == 0)
+      }
+      metadataManager.identifierRequestsAsyncRetryingCounts.clear()
     }
   }
 
-  test("retry the metadata store requests") {
-    val metadata = Metadata(
-      identifier = UUID.randomUUID().toString,
-      sessionType = SessionType.BATCH,
-      realUser = "kyuubi",
-      username = "kyuubi",
-      ipAddress = "127.0.0.1",
-      kyuubiInstance = "localhost:10009",
-      state = "PENDING",
-      resource = "intern",
-      className = "org.apache.kyuubi.SparkWC",
-      requestName = "kyuubi_batch",
-      requestConf = Map("spark.master" -> "local"),
-      requestArgs = Seq("100"),
-      createTime = System.currentTimeMillis(),
-      engineType = "spark",
-      clusterManager = Some("local"))
-    metadataManager.insertMetadata(metadata)
-    intercept[KyuubiException] {
-      metadataManager.insertMetadata(metadata, retryOnError = false)
-    }
-    metadataManager.insertMetadata(metadata, retryOnError = true)
-    val retryRef = metadataManager.getMetadataRequestsRetryRef(metadata.identifier)
-    val metadataToUpdate = metadata.copy(state = "RUNNING")
-    retryRef.addRetryingMetadataRequest(UpdateMetadata(metadataToUpdate))
-    eventually(timeout(3.seconds)) {
-      assert(retryRef.hasRemainingRequests())
-      assert(metadataManager.getBatch(metadata.identifier).getState === "PENDING")
-    }
-
-    val metadata2 = metadata.copy(identifier = UUID.randomUUID().toString)
-    val metadata2ToUpdate = metadata2.copy(
-      engineId = "app_id",
-      engineName = "app_name",
-      engineUrl = "app_url",
-      engineState = "app_state",
-      state = "RUNNING")
-
-    metadataManager.addMetadataRetryRequest(InsertMetadata(metadata2))
-    metadataManager.addMetadataRetryRequest(UpdateMetadata(metadata2ToUpdate))
-
-    val retryRef2 = metadataManager.getMetadataRequestsRetryRef(metadata2.identifier)
-
-    eventually(timeout(3.seconds)) {
-      assert(!retryRef2.hasRemainingRequests())
-      assert(metadataManager.getBatch(metadata2.identifier).getState === "RUNNING")
+  test("async metadata request metrics") {
+    withMetadataManager(Map(
+      METADATA_REQUEST_ASYNC_RETRY_ENABLED.key -> "true",
+      METADATA_REQUEST_RETRY_INTERVAL.key -> "100")) { metadataManager =>
+      val totalRequests = MetricsSystem.meterValue(METADATA_REQUEST_TOTAL).getOrElse(0L)
+      val failedRequests = MetricsSystem.meterValue(METADATA_REQUEST_FAIL).getOrElse(0L)
+      val retryingRequests = MetricsSystem.meterValue(METADATA_REQUEST_RETRYING).getOrElse(0L)
+
+      val metadata = newMetadata()
+      metadataManager.insertMetadata(metadata)
+
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL)
+          .getOrElse(0L) - totalRequests === 1)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL)
+          .getOrElse(0L) - failedRequests === 0)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_RETRYING)
+          .getOrElse(0L) - retryingRequests === 0)
+
+      val invalidMetadata = metadata.copy(kyuubiInstance = null)
+      intercept[Exception](metadataManager.insertMetadata(invalidMetadata, false))
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL)
+          .getOrElse(0L) - totalRequests === 2)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL)
+          .getOrElse(0L) - failedRequests === 1)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_RETRYING)
+          .getOrElse(0L) - retryingRequests === 0)
+
+      metadataManager.insertMetadata(invalidMetadata, true)
+
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL)
+          .getOrElse(0L) - totalRequests === 3)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL)
+          .getOrElse(0L) - failedRequests === 2)
+      assert(
+        MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_RETRYING)
+          .getOrElse(0L) - retryingRequests === 1)
     }
+  }
 
-    metadataManager.identifierRequestsRetryRefs.clear()
-    eventually(timeout(3.seconds)) {
-      metadataManager.identifierRequestsRetryingCounts.asScala.forall(_._2.get() == 0)
+  private def withMetadataManager(
+      confOverlay: Map[String, String],
+      newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
+      f: MetadataManager => Unit): Unit = {
+    val metricsSystem = new MetricsSystem()
+    val metadataManager = newMetadataMgr()
+    val conf = KyuubiConf()
+    confOverlay.foreach { case (k, v) => conf.set(k, v) }
+    try {
+      metricsSystem.initialize(conf)
+      metricsSystem.start()
+      metadataManager.initialize(conf)
+      metadataManager.start()
+      f(metadataManager)
+    } finally {
+      metadataManager.getBatches(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch =>
+        metadataManager.cleanupMetadataById(batch.getId)
+      }
+      // ensure no metadata request leak
+      eventually(timeout(5.seconds), interval(200.milliseconds)) {
+        assert(MetricsSystem.counterValue(METADATA_REQUEST_OPENED).getOrElse(0L) === 0)
+      }
+      metadataManager.stop()
+      metricsSystem.stop()
     }
-    metadataManager.identifierRequestsRetryingCounts.clear()
   }
 
-  test("metadata request metrics") {
-    val totalRequests =
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL).getOrElse(0L)
-    val failedRequests =
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL).getOrElse(0L)
-    val retryingRequests =
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_RETRYING).getOrElse(0L)
-
-    val metadata = Metadata(
+  private def newMetadata(): Metadata = {
+    Metadata(
       identifier = UUID.randomUUID().toString,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
@@ -137,37 +186,5 @@ class MetadataManagerSuite extends KyuubiFunSuite {
       createTime = System.currentTimeMillis(),
       engineType = "spark",
       clusterManager = Some("local"))
-    metadataManager.insertMetadata(metadata)
-
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL).getOrElse(
-        0L) - totalRequests === 1)
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL).getOrElse(
-        0L) - failedRequests === 0)
-    assert(MetricsSystem.meterValue(
-      MetricsConstants.METADATA_REQUEST_RETRYING).getOrElse(0L) - retryingRequests === 0)
-
-    val invalidMetadata = metadata.copy(kyuubiInstance = null)
-    intercept[Exception](metadataManager.insertMetadata(invalidMetadata, false))
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL).getOrElse(
-        0L) - totalRequests === 2)
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL).getOrElse(
-        0L) - failedRequests === 1)
-    assert(MetricsSystem.meterValue(
-      MetricsConstants.METADATA_REQUEST_RETRYING).getOrElse(0L) - retryingRequests === 0)
-
-    metadataManager.insertMetadata(invalidMetadata, true)
-
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_TOTAL).getOrElse(
-        0L) - totalRequests === 3)
-    assert(
-      MetricsSystem.meterValue(MetricsConstants.METADATA_REQUEST_FAIL).getOrElse(
-        0L) - failedRequests === 2)
-    assert(MetricsSystem.meterValue(
-      MetricsConstants.METADATA_REQUEST_RETRYING).getOrElse(0L) - retryingRequests === 1)
   }
 }