You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/16 04:20:57 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2834] [SUB-TASK][KPIP-4] Support to retry the metadata requests on transient issue and unblock main thread

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7baf98954 [KYUUBI #2834] [SUB-TASK][KPIP-4] Support to retry the metadata requests on transient issue and unblock main thread
7baf98954 is described below

commit 7baf98954ddd45ccccd86a7c022506bb1a6f0b77
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Thu Jun 16 12:20:49 2022 +0800

    [KYUUBI #2834] [SUB-TASK][KPIP-4] Support to retry the metadata requests on transient issue and unblock main thread
    
    ### _Why are the changes needed?_
    
    We need support to retry the jdbc requests.
    
    Because we add new jdbc dependency for batch HA, and the database might be in maintenance window.
    
    And there might be master-slave switch, and the database might be unavailable for few minutes.
    
    We need tolerant this issue and unblock the main thread.
    
    BTW, this pr refactor the metadata store naming.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2834 from turboFei/retrying_jdbc_event.
    
    Closes #2834
    
    109a63e70 [Fei Wang] fix docs
    a186484be [Fei Wang] refactor the jdbc pool name
    cfac9da53 [Fei Wang] rename metadata table name
    bb8710ba3 [Fei Wang] comments
    1197a098b [Fei Wang] add private package scope
    a95694ba9 [Fei Wang] comments
    e2f09e784 [Fei Wang] refactor
    057eb73c5 [Fei Wang] refactor
    88f3ffb6f [Fei Wang] refactor
    e0595bdee [Fei Wang] comments
    48270e869 [Fei Wang] refactor conf keys
    e4f190c5c [Fei Wang] update docs
    ed5e90e37 [Fei Wang] add max queues limitation
    c5a03c9e2 [Fei Wang] remove getOrCreate
    3e39ca94c [Fei Wang] remove
    b79fb0954 [Fei Wang] comments
    1d309cee0 [Fei Wang] comments
    9108a3f0e [Fei Wang] fix docs
    5cdffe710 [Fei Wang] fix rat
    2e7228fc7 [Fei Wang] fix ut
    520236d7b [Fei Wang] rename more
    f66d68bcc [Fei Wang] merge into metadata manager
    2f70a2f12 [Fei Wang] rename
    80e6100fb [Fei Wang] refactor
    09cad092a [Fei Wang] add ut
    ea4eea74f [Fei Wang] refactor
    6b00eb0b3 [Fei Wang] refactor to MetadataRequest
    58f1c0587 [Fei Wang] commements
    198feb53f [Fei Wang] add more docs
    670a23561 [Fei Wang] refactor
    a20d54df2 [Fei Wang] waitStateStoreRetryCompletion after super.close closes operation
    97a763aef [Fei Wang] refactor RetryingStateStoreRequest
    3e136cf34 [Fei Wang] [SUB-TASK][KPIP-4] Support to retry the session state operations if meet transient jdbc issue
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .rat-excludes                                      |   2 +-
 docs/deployment/settings.md                        |  32 ++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  59 +++--
 ...a-derby.sql => metadata-store-schema-derby.sql} |  10 +-
 ...a-mysql.sql => metadata-store-schema-mysql.sql} |   4 +-
 .../kyuubi/operation/BatchJobSubmission.scala      |  21 +-
 .../kyuubi/operation/KyuubiOperationManager.scala  |   4 +-
 .../kyuubi/server/KyuubiRestFrontendService.scala  |   4 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     |   8 +-
 .../kyuubi/server/metadata/MetadataManager.scala   | 293 +++++++++++++++++++++
 .../MetadataRequest.scala}                         |  12 +-
 .../MetadataRequestsRetryRef.scala}                |  14 +-
 .../MetadataStore.scala}                           |  16 +-
 .../api/Metadata.scala}                            |   4 +-
 .../jdbc/DatabaseType.scala                        |   2 +-
 .../jdbc/JDBCMetadataStore.scala}                  |  57 ++--
 .../jdbc/JDBCMetadataStoreConf.scala}              |  52 ++--
 .../jdbc/JdbcDatabaseDialect.scala                 |   2 +-
 .../server/statestore/SessionStateStore.scala      | 188 -------------
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |  19 +-
 .../kyuubi/session/KyuubiSessionManager.scala      | 102 ++++---
 .../kyuubi/config/AllKyuubiConfiguration.scala     |   4 +-
 .../server/api/v1/BatchesResourceSuite.scala       |  30 ++-
 .../server/metadata/MetadataManagerSuite.scala     |  95 +++++++
 .../jdbc/JDBCMetadataStoreSuite.scala}             | 102 +++----
 .../kyuubi/server/rest/client/BatchCliSuite.scala  |   2 +-
 26 files changed, 699 insertions(+), 439 deletions(-)

diff --git a/.rat-excludes b/.rat-excludes
index e6f9ae8fb..7825b2cf4 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -47,4 +47,4 @@ build/scala-*/**
 **/org/apache/kyuubi/ui/static/assets/**
 **/org/apache/kyuubi/ui/swagger/**
 **/org.apache.spark.status.AppHistoryServerPlugin
-**/statestore-schema*.sql
+**/metadata-store-schema*.sql
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 7acd94c18..36980ceb5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -336,6 +336,26 @@ kyuubi.kinit.max.attempts|10|How many times will `kinit` process retry|int|1.0.0
 kyuubi.kinit.principal|&lt;undefined&gt;|Name of the Kerberos principal.|string|1.0.0
 
 
+### Metadata
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+kyuubi.metadata.cleaner.enabled|true|Whether to clean the metadata periodically. If it is enabled, Kyuubi will clean the metadata that is in terminate state with max age limitation.|boolean|1.6.0
+kyuubi.metadata.cleaner.interval|PT30M|The interval to check and clean expired metadata.|duration|1.6.0
+kyuubi.metadata.max.age|PT72H|The maximum age of metadata, the metadata that exceeds the age will be cleaned.|duration|1.6.0
+kyuubi.metadata.recovery.threads|10|The number of threads for recovery from metadata store when Kyuubi server restarting.|int|1.6.0
+kyuubi.metadata.request.retry.interval|PT5S|The interval to check and trigger the metadata request retry tasks.|duration|1.6.0
+kyuubi.metadata.request.retry.queue.size|65536|The maximum queue size for buffering metadata requests in memory when the external metadata storage is down. Requests will be dropped if the queue exceeds.|int|1.6.0
+kyuubi.metadata.request.retry.threads|10|Number of threads in the metadata request retry manager thread pool. The metadata store might be unavailable sometimes and the requests will fail, to tolerant for this case and unblock the main thread, we support to retry the failed requests in async way.|int|1.6.0
+kyuubi.metadata.store.class|org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStore|Fully qualified class name for server metadata store.|string|1.6.0
+kyuubi.metadata.store.jdbc.database.schema.init|true|Whether to init the jdbc metadata store database schema.|boolean|1.6.0
+kyuubi.metadata.store.jdbc.database.type|DERBY|The database type for server jdbc metadata store.<ul> <li>DERBY: Apache Derby, jdbc driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>MYSQL: MySQL, jdbc driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need specify the jdbc driver in addition.</li> Note that: The jdbc datasource is powered by HiKariCP, for datasource properties, please specify them with prefix: kyuubi.server.metadata.store.jdbc.datasource. [...]
+kyuubi.metadata.store.jdbc.driver|&lt;undefined&gt;|JDBC driver class name for server jdbc metadata store.|string|1.6.0
+kyuubi.metadata.store.jdbc.password||The password for server jdbc metadata store.|string|1.6.0
+kyuubi.metadata.store.jdbc.url|jdbc:derby:memory:kyuubi_state_store_db;create=true|The jdbc url for server jdbc metadata store. By defaults, it is a DERBY in-memory database url, and the state information is not shared across kyuubi instances. To enable multiple kyuubi instances high available, please specify a production jdbc url.|string|1.6.0
+kyuubi.metadata.store.jdbc.user||The username for server jdbc metadata store.|string|1.6.0
+
+
 ### Metrics
 
 Key | Default | Meaning | Type | Since
@@ -379,18 +399,6 @@ kyuubi.server.limit.connections.per.user|&lt;undefined&gt;|Maximum kyuubi server
 kyuubi.server.limit.connections.per.user.ipaddress|&lt;undefined&gt;|Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect.|int|1.6.0
 kyuubi.server.name|&lt;undefined&gt;|The name of Kyuubi Server.|string|1.5.0
 kyuubi.server.redaction.regex|&lt;undefined&gt;|Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs.||1.6.0
-kyuubi.server.state.store.class|org.apache.kyuubi.server.statestore.jdbc.JDBCStateStore|Fully qualified class name for server state store.|string|1.6.0
-kyuubi.server.state.store.cleaner.enabled|true|Whether to clean the state store periodically. If it is enabled, Kyuubi will clean the state information that is in terminate state with max age limitation.|boolean|1.6.0
-kyuubi.server.state.store.cleaner.interval|PT30M|The interval to clean state store.|duration|1.6.0
-kyuubi.server.state.store.jdbc.database.schema.init|true|Whether to init the jdbc state store database schema.|boolean|1.6.0
-kyuubi.server.state.store.jdbc.database.type|DERBY|The database type for server jdbc state store.<ul> <li>DERBY: Apache Derby, jdbc driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>MYSQL: MySQL, jdbc driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need specify the jdbc driver in addition.</li> Note that: The jdbc datasource is powered by HiKariCP, for datasource properties, please specify them with prefix: kyuubi.server.state.store.jdbc.datasource. F [...]
-kyuubi.server.state.store.jdbc.driver|&lt;undefined&gt;|JDBC driver class name for server jdbc state store.|string|1.6.0
-kyuubi.server.state.store.jdbc.password||The password for server jdbc state store.|string|1.6.0
-kyuubi.server.state.store.jdbc.url|jdbc:derby:memory:kyuubi_state_store_db;create=true|The jdbc url for server jdbc state store. By defaults, it is a DERBY in-memory database url, and the state information is not shared across kyuubi instances. To enable multiple kyuubi instances high available, please specify a production jdbc url.|string|1.6.0
-kyuubi.server.state.store.jdbc.user||The username for server jdbc state store.|string|1.6.0
-kyuubi.server.state.store.max.age|PT72H|The maximum age of state info in state store.|duration|1.6.0
-kyuubi.server.state.store.sessions.recovery.num.threads|10|The number of threads for sessions recovery from state store.|int|1.6.0
-kyuubi.server.state.store.sessions.recovery.per.batch|100|The number of sessions to recover from state store per batch.|int|1.6.0
 
 
 ### Session
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8d4521e26..0172c4cba 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1004,49 +1004,66 @@ object KyuubiConf {
       .intConf
       .createWithDefault(100)
 
-  val SERVER_STATE_STORE_CLASS: ConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.class")
-      .doc("Fully qualified class name for server state store.")
+  val METADATA_STORE_CLASS: ConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.class")
+      .doc("Fully qualified class name for server metadata store.")
       .version("1.6.0")
       .stringConf
-      .createWithDefault("org.apache.kyuubi.server.statestore.jdbc.JDBCStateStore")
+      .createWithDefault("org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStore")
 
-  val SERVER_STATE_STORE_CLEANER_ENABLED: ConfigEntry[Boolean] =
-    buildConf("kyuubi.server.state.store.cleaner.enabled")
-      .doc("Whether to clean the state store periodically. If it is enabled, Kyuubi will clean" +
-        " the state information that is in terminate state with max age limitation.")
+  val METADATA_CLEANER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.metadata.cleaner.enabled")
+      .doc("Whether to clean the metadata periodically. If it is enabled, Kyuubi will clean the" +
+        " metadata that is in terminate state with max age limitation.")
       .version("1.6.0")
       .booleanConf
       .createWithDefault(true)
 
-  val SERVER_STATE_STORE_MAX_AGE: ConfigEntry[Long] =
-    buildConf("kyuubi.server.state.store.max.age")
-      .doc("The maximum age of state info in state store.")
+  val METADATA_MAX_AGE: ConfigEntry[Long] =
+    buildConf("kyuubi.metadata.max.age")
+      .doc("The maximum age of metadata, the metadata that exceeds the age will be cleaned.")
       .version("1.6.0")
       .timeConf
       .createWithDefault(Duration.ofDays(3).toMillis)
 
-  val SERVER_STATE_STORE_CLEANER_INTERVAL: ConfigEntry[Long] =
-    buildConf("kyuubi.server.state.store.cleaner.interval")
-      .doc("The interval to clean state store.")
+  val METADATA_CLEANER_INTERVAL: ConfigEntry[Long] =
+    buildConf("kyuubi.metadata.cleaner.interval")
+      .doc("The interval to check and clean expired metadata.")
       .version("1.6.0")
       .timeConf
       .createWithDefault(Duration.ofMinutes(30).toMillis)
 
-  val SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH: ConfigEntry[Int] =
-    buildConf("kyuubi.server.state.store.sessions.recovery.per.batch")
-      .doc("The number of sessions to recover from state store per batch.")
+  val METADATA_RECOVERY_THREADS: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.recovery.threads")
+      .doc("The number of threads for recovery from metadata store when Kyuubi server restarting.")
       .version("1.6.0")
       .intConf
-      .createWithDefault(100)
+      .createWithDefault(10)
 
-  val SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS: ConfigEntry[Int] =
-    buildConf("kyuubi.server.state.store.sessions.recovery.num.threads")
-      .doc("The number of threads for sessions recovery from state store.")
+  val METADATA_REQUEST_RETRY_THREADS: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.request.retry.threads")
+      .doc("Number of threads in the metadata request retry manager thread pool. The metadata" +
+        " store might be unavailable sometimes and the requests will fail, to tolerant for this" +
+        " case and unblock the main thread, we support to retry the failed requests in async way.")
       .version("1.6.0")
       .intConf
       .createWithDefault(10)
 
+  val METADATA_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
+    buildConf("kyuubi.metadata.request.retry.interval")
+      .doc("The interval to check and trigger the metadata request retry tasks.")
+      .version("1.6.0")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(5).toMillis)
+
+  val METADATA_REQUEST_RETRY_QUEUE_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.metadata.request.retry.queue.size")
+      .doc("The maximum queue size for buffering metadata requests in memory when the external" +
+        " metadata storage is down. Requests will be dropped if the queue exceeds.")
+      .version("1.6.0")
+      .intConf
+      .createWithDefault(65536)
+
   val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
       .doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
diff --git a/kyuubi-server/src/main/resources/sql/derby/statestore-schema-derby.sql b/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql
similarity index 80%
rename from kyuubi-server/src/main/resources/sql/derby/statestore-schema-derby.sql
rename to kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql
index 4b6d92d20..c1b84fda8 100644
--- a/kyuubi-server/src/main/resources/sql/derby/statestore-schema-derby.sql
+++ b/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql
@@ -1,6 +1,6 @@
 -- the metadata table ddl
 
-CREATE TABLE session_metadata(
+CREATE TABLE metadata(
     key_id bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, -- the auto increment key id
     identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
     session_type varchar(128) NOT NULL, -- the session type, SQL or BATCH
@@ -25,10 +25,10 @@ CREATE TABLE session_metadata(
     end_time bigint  -- the metadata end time
 );
 
-CREATE INDEX metadata_kyuubi_instance_index ON session_metadata(kyuubi_instance);
+CREATE INDEX metadata_kyuubi_instance_index ON metadata(kyuubi_instance);
 
-CREATE INDEX metadata_identifier_index ON session_metadata(identifier);
+CREATE UNIQUE INDEX metadata_unique_identifier_index ON metadata(identifier);
 
-CREATE INDEX metadata_user_name_index ON session_metadata(user_name);
+CREATE INDEX metadata_user_name_index ON metadata(user_name);
 
-CREATE INDEX metadata_engine_type_index ON session_metadata(engine_type);
+CREATE INDEX metadata_engine_type_index ON metadata(engine_type);
diff --git a/kyuubi-server/src/main/resources/sql/mysql/statestore-schema-mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql
similarity index 95%
rename from kyuubi-server/src/main/resources/sql/mysql/statestore-schema-mysql.sql
rename to kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql
index 5c5979dc8..aa8e2663c 100644
--- a/kyuubi-server/src/main/resources/sql/mysql/statestore-schema-mysql.sql
+++ b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql
@@ -1,6 +1,6 @@
 -- the metadata table ddl
 
-CREATE TABLE session_metadata(
+CREATE TABLE metadata(
     key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
     identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
     session_type varchar(128) NOT NULL COMMENT 'the session type, SQL or BATCH',
@@ -24,7 +24,7 @@ CREATE TABLE session_metadata(
     engine_error mediumtext COMMENT 'the engine application diagnose',
     end_time bigint COMMENT 'the metadata end time',
     INDEX kyuubi_instance_index(kyuubi_instance),
-    INDEX identifier_index(identifier),
+    UNIQUE INDEX unique_identifier_index(identifier),
     INDEX user_name_index(user_name),
     INDEX engine_type_index(engine_type)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 308ef0be2..5af11b7ab 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -38,7 +38,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.KyuubiBatchSessionImpl
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -62,7 +62,7 @@ class BatchJobSubmission(
     className: String,
     batchConf: Map[String, String],
     batchArgs: Seq[String],
-    recoveryMetadata: Option[SessionMetadata])
+    recoveryMetadata: Option[Metadata])
   extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
 
   override def statement: String = "BATCH_JOB_SUBMISSION"
@@ -118,11 +118,18 @@ class BatchJobSubmission(
       } else {
         0L
       }
-    session.sessionManager.updateBatchMetadata(
-      batchId,
-      state,
-      applicationStatus.getOrElse(Map.empty),
-      endTime)
+
+    val engineAppStatus = applicationStatus.getOrElse(Map.empty)
+    val metadataToUpdate = Metadata(
+      identifier = batchId,
+      state = state.toString,
+      engineId = engineAppStatus.get(APP_ID_KEY).orNull,
+      engineName = engineAppStatus.get(APP_NAME_KEY).orNull,
+      engineUrl = engineAppStatus.get(APP_URL_KEY).orNull,
+      engineState = engineAppStatus.get(APP_STATE_KEY).orNull,
+      engineError = engineAppStatus.get(APP_ERROR_KEY),
+      endTime = endTime)
+    session.sessionManager.updateMetadata(metadataToUpdate)
   }
 
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 00a78e2f8..d0c02522a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -71,7 +71,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
       className: String,
       batchConf: Map[String, String],
       batchArgs: Seq[String],
-      recoveryMetadata: Option[SessionMetadata]): BatchJobSubmission = {
+      recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
     val operation = new BatchJobSubmission(
       session,
       batchType,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 52dc93238..776097ca0 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -28,7 +28,7 @@ import org.eclipse.jetty.servlet.FilterHolder
 
 import org.apache.kyuubi.{KyuubiException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS}
+import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, METADATA_RECOVERY_THREADS}
 import org.apache.kyuubi.server.api.v1.ApiRootResource
 import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
 import org.apache.kyuubi.server.ui.JettyServer
@@ -79,7 +79,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
 
   @VisibleForTesting
   private[kyuubi] def recoverBatchSessions(): Unit = {
-    val recoveryNumThreads = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS)
+    val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
     val batchRecoveryExecutor =
       ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, "batch-recovery-executor")
     try {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index bd29614db..2034be8d0 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -126,7 +126,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
     Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
       buildBatch(batchSession)
     }.getOrElse {
-      Option(sessionManager.getBatchFromStateStore(batchId)).getOrElse {
+      Option(sessionManager.getBatchFromMetadataStore(batchId)).getOrElse {
         error(s"Invalid batchId: $batchId")
         throw new NotFoundException(s"Invalid batchId: $batchId")
       }
@@ -158,7 +158,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
         s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
     }
     val batches =
-      sessionManager.getBatchesFromStateStore(
+      sessionManager.getBatchesFromMetadataStore(
         batchType,
         batchUser,
         batchState,
@@ -199,7 +199,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
           throw new NotFoundException(errorMsg)
       }
     }.getOrElse {
-      Option(sessionManager.getBatchSessionMetadata(batchId)).map { metadata =>
+      Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
         if (fe.connectionUrl != metadata.kyuubiInstance) {
           val internalRestClient = getInternalRestClient(metadata.kyuubiInstance)
           internalRestClient.getBatchLocalLog(userName, batchId, from, size)
@@ -246,7 +246,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
       val (success, msg) = batchSession.batchJobSubmissionOp.getKillMessage
       new CloseBatchResponse(success, msg)
     }.getOrElse {
-      Option(sessionManager.getBatchSessionMetadata(batchId)).map { metadata =>
+      Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
         if (userName != metadata.username) {
           throw new NotAllowedException(
             s"$userName is not allowed to close the session belong to ${metadata.username}")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
new file mode 100644
index 000000000..dff9066a9
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.metadata
+
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.{KyuubiException, Logging}
+import org.apache.kyuubi.client.api.v1.dto.Batch
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.METADATA_MAX_AGE
+import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.service.CompositeService
+import org.apache.kyuubi.session.SessionType
+import org.apache.kyuubi.util.{ClassUtils, ThreadUtils}
+
+class MetadataManager extends CompositeService("MetadataManager") {
+  private var _metadataStore: MetadataStore = _
+
+  private val identifierRequestsRetryRefs =
+    new ConcurrentHashMap[String, MetadataRequestsRetryRef]()
+
+  private val identifierRequestsRetryingCounts =
+    new ConcurrentHashMap[String, AtomicInteger]()
+
+  private val requestsRetryTrigger =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("metadata-requests-retry-trigger")
+
+  private var requestsRetryExecutor: ThreadPoolExecutor = _
+
+  private var maxMetadataRequestsRetryRefs: Int = _
+
+  private val metadataCleaner =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("metadata-cleaner")
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    _metadataStore = MetadataManager.createMetadataStore(conf)
+    val retryExecutorNumThreads =
+      conf.get(KyuubiConf.METADATA_REQUEST_RETRY_THREADS)
+    requestsRetryExecutor = ThreadUtils.newDaemonFixedThreadPool(
+      retryExecutorNumThreads,
+      "metadata-requests-retry-executor")
+    maxMetadataRequestsRetryRefs = conf.get(KyuubiConf.METADATA_REQUEST_RETRY_QUEUE_SIZE)
+    super.initialize(conf)
+  }
+
+  override def start(): Unit = {
+    super.start()
+    startMetadataRequestsRetryTrigger()
+    startMetadataCleaner()
+  }
+
+  override def stop(): Unit = {
+    ThreadUtils.shutdown(requestsRetryTrigger)
+    ThreadUtils.shutdown(requestsRetryExecutor)
+    ThreadUtils.shutdown(metadataCleaner)
+    _metadataStore.close()
+    super.stop()
+  }
+
+  def insertMetadata(metadata: Metadata, retryOnError: Boolean = true): Unit = {
+    try {
+      _metadataStore.insertMetadata(metadata)
+    } catch {
+      case e: Throwable if retryOnError =>
+        error(s"Error inserting metadata for session ${metadata.identifier}", e)
+        addMetadataRetryRequest(InsertMetadata(metadata))
+    }
+  }
+
+  def getBatch(batchId: String): Batch = {
+    Option(getBatchSessionMetadata(batchId)).map(buildBatch).orNull
+  }
+
+  def getBatchSessionMetadata(batchId: String): Metadata = {
+    Option(_metadataStore.getMetadata(batchId, true)).filter(
+      _.sessionType == SessionType.BATCH).orNull
+  }
+
+  def getBatches(
+      batchType: String,
+      batchUser: String,
+      batchState: String,
+      createTime: Long,
+      endTime: Long,
+      from: Int,
+      size: Int): Seq[Batch] = {
+    _metadataStore.getMetadataList(
+      SessionType.BATCH,
+      batchType,
+      batchUser,
+      batchState,
+      null,
+      createTime,
+      endTime,
+      from,
+      size,
+      true).map(buildBatch)
+  }
+
+  def getBatchesRecoveryMetadata(
+      state: String,
+      kyuubiInstance: String,
+      from: Int,
+      size: Int): Seq[Metadata] = {
+    _metadataStore.getMetadataList(
+      SessionType.BATCH,
+      null,
+      null,
+      state,
+      kyuubiInstance,
+      0,
+      0,
+      from,
+      size,
+      false)
+  }
+
+  def updateMetadata(metadata: Metadata, retryOnError: Boolean = true): Unit = {
+    try {
+      _metadataStore.updateMetadata(metadata)
+    } catch {
+      case e: Throwable if retryOnError =>
+        error(s"Error updating metadata for session ${metadata.identifier}", e)
+        addMetadataRetryRequest(UpdateMetadata(metadata))
+    }
+  }
+
+  def cleanupMetadataById(batchId: String): Unit = {
+    _metadataStore.cleanupMetadataByIdentifier(batchId)
+  }
+
+  private def buildBatch(batchMetadata: Metadata): Batch = {
+    val batchAppInfo = Map(
+      APP_ID_KEY -> Option(batchMetadata.engineId),
+      APP_NAME_KEY -> Option(batchMetadata.engineName),
+      APP_STATE_KEY -> Option(batchMetadata.engineState),
+      APP_URL_KEY -> Option(batchMetadata.engineUrl),
+      APP_ERROR_KEY -> batchMetadata.engineError)
+      .filter(_._2.isDefined)
+      .map(info => (info._1, info._2.get))
+
+    new Batch(
+      batchMetadata.identifier,
+      batchMetadata.username,
+      batchMetadata.engineType,
+      batchMetadata.requestName,
+      batchAppInfo.asJava,
+      batchMetadata.kyuubiInstance,
+      batchMetadata.state,
+      batchMetadata.createTime,
+      batchMetadata.endTime)
+  }
+
+  private def startMetadataCleaner(): Unit = {
+    val cleanerEnabled = conf.get(KyuubiConf.METADATA_CLEANER_ENABLED)
+    val stateMaxAge = conf.get(METADATA_MAX_AGE)
+
+    if (cleanerEnabled) {
+      val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL)
+      val cleanerTask: Runnable = () => {
+        try {
+          _metadataStore.cleanupMetadataByAge(stateMaxAge)
+        } catch {
+          case e: Throwable => error("Error cleaning up the metadata by age", e)
+        }
+      }
+
+      metadataCleaner.scheduleWithFixedDelay(
+        cleanerTask,
+        interval,
+        interval,
+        TimeUnit.MILLISECONDS)
+    }
+  }
+
+  def addMetadataRetryRequest(request: MetadataRequest): Unit = {
+    if (identifierRequestsRetryRefs.size() > maxMetadataRequestsRetryRefs) {
+      throw new KyuubiException(
+        "The number of metadata requests retry instances exceeds the limitation:" +
+          maxMetadataRequestsRetryRefs)
+    }
+    val identifier = request.metadata.identifier
+    val ref = identifierRequestsRetryRefs.computeIfAbsent(
+      identifier,
+      identifier => {
+        val ref = new MetadataRequestsRetryRef
+        debug(s"Created MetadataRequestsRetryRef for session $identifier.")
+        ref
+      })
+    ref.addRetryingMetadataRequest(request)
+    identifierRequestsRetryRefs.putIfAbsent(identifier, ref)
+  }
+
+  def getMetadataRequestsRetryRef(identifier: String): MetadataRequestsRetryRef = {
+    identifierRequestsRetryRefs.get(identifier)
+  }
+
+  def deRegisterRequestsRetryRef(identifier: String): Unit = {
+    identifierRequestsRetryRefs.remove(identifier)
+    identifierRequestsRetryingCounts.remove(identifier)
+  }
+
+  private def startMetadataRequestsRetryTrigger(): Unit = {
+    val interval = conf.get(KyuubiConf.METADATA_REQUEST_RETRY_INTERVAL)
+    val triggerTask = new Runnable {
+      override def run(): Unit = {
+        identifierRequestsRetryRefs.forEach { (id, ref) =>
+          if (!ref.hasRemainingRequests()) {
+            identifierRequestsRetryRefs.remove(id)
+            identifierRequestsRetryingCounts.remove(id)
+          } else {
+            val retryingCount =
+              identifierRequestsRetryingCounts.computeIfAbsent(id, _ => new AtomicInteger(0))
+
+            if (retryingCount.get() == 0) {
+              val retryTask = new Runnable {
+                override def run(): Unit = {
+                  try {
+                    info(s"Retrying metadata requests for $id")
+                    var request = ref.metadataRequests.peek()
+                    while (request != null) {
+                      request match {
+                        case insert: InsertMetadata =>
+                          insertMetadata(insert.metadata, retryOnError = false)
+
+                        case update: UpdateMetadata =>
+                          updateMetadata(update.metadata, retryOnError = false)
+
+                        case _ =>
+                      }
+                      ref.metadataRequests.remove(request)
+                      request = ref.metadataRequests.peek()
+                    }
+                  } catch {
+                    case e: Throwable =>
+                      error(s"Error retrying metadata requests for $id", e)
+                  } finally {
+                    retryingCount.decrementAndGet()
+                  }
+                }
+              }
+
+              try {
+                retryingCount.incrementAndGet()
+                requestsRetryExecutor.submit(retryTask)
+              } catch {
+                case e: Throwable =>
+                  error(s"Error submitting metadata retry requests for $id", e)
+                  retryingCount.decrementAndGet()
+              }
+            }
+
+          }
+        }
+      }
+    }
+    requestsRetryTrigger.scheduleWithFixedDelay(
+      triggerTask,
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
+  }
+}
+
+object MetadataManager extends Logging {
+  def createMetadataStore(conf: KyuubiConf): MetadataStore = {
+    val className = conf.get(KyuubiConf.METADATA_STORE_CLASS)
+    if (className.isEmpty) {
+      throw new KyuubiException(
+        s"${KyuubiConf.METADATA_STORE_CLASS.key} cannot be empty.")
+    }
+    ClassUtils.createInstance(className, classOf[MetadataStore], conf)
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
similarity index 73%
copy from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
copy to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
index aebbd6eff..dcee6466b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequest.scala
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata
 
-object DatabaseType extends Enumeration {
-  type DatabaseType = Value
+import org.apache.kyuubi.server.metadata.api.Metadata
 
-  val DERBY, MYSQL, CUSTOM = Value
+trait MetadataRequest {
+  def metadata: Metadata
 }
+
+case class InsertMetadata(metadata: Metadata) extends MetadataRequest
+
+case class UpdateMetadata(metadata: Metadata) extends MetadataRequest
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequestsRetryRef.scala
similarity index 66%
copy from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
copy to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequestsRetryRef.scala
index aebbd6eff..f6fd1af07 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataRequestsRetryRef.scala
@@ -15,10 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata
 
-object DatabaseType extends Enumeration {
-  type DatabaseType = Value
+import java.util.concurrent.ConcurrentLinkedQueue
 
-  val DERBY, MYSQL, CUSTOM = Value
+class MetadataRequestsRetryRef {
+  private[metadata] val metadataRequests = new ConcurrentLinkedQueue[MetadataRequest]()
+
+  private[metadata] def addRetryingMetadataRequest(event: MetadataRequest): Unit = {
+    metadataRequests.add(event)
+  }
+
+  def hasRemainingRequests(): Boolean = !metadataRequests.isEmpty
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
similarity index 84%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index bcf710382..07a549814 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore
+package org.apache.kyuubi.server.metadata
 
 import java.io.Closeable
 
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType.SessionType
 
-trait StateStore extends Closeable {
+trait MetadataStore extends Closeable {
 
   /**
-   * Insert a metadata into state store.
+   * Insert a metadata into metadata store.
    */
-  def insertMetadata(metadata: SessionMetadata): Unit
+  def insertMetadata(metadata: Metadata): Unit
 
   /**
    * Get the persisted metadata by batch identifier.
@@ -35,7 +35,7 @@ trait StateStore extends Closeable {
    * @param stateOnly only return the state related column values.
    * @return selected metadata.
    */
-  def getMetadata(identifier: String, stateOnly: Boolean): SessionMetadata
+  def getMetadata(identifier: String, stateOnly: Boolean): Metadata
 
   /**
    * Get the metadata list with filter conditions, offset and size.
@@ -61,13 +61,13 @@ trait StateStore extends Closeable {
       endTime: Long,
       from: Int,
       size: Int,
-      stateOnly: Boolean): Seq[SessionMetadata]
+      stateOnly: Boolean): Seq[Metadata]
 
   /**
    * Update the metadata according to identifier.
    * Note that, it will only update the state and engine related metadata.
    */
-  def updateMetadata(metadata: SessionMetadata): Unit
+  def updateMetadata(metadata: Metadata): Unit
 
   /**
    * Cleanup meta data by identifier.
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/api/SessionMetadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
similarity index 97%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/api/SessionMetadata.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index f6bb58f18..c1003cb98 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/api/SessionMetadata.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.api
+package org.apache.kyuubi.server.metadata.api
 
 import org.apache.kyuubi.session.SessionType.SessionType
 
@@ -48,7 +48,7 @@ import org.apache.kyuubi.session.SessionType.SessionType
  * @param engineError the engine error diagnose.
  * @param endTime the end time.
  */
-case class SessionMetadata(
+case class Metadata(
     identifier: String,
     sessionType: SessionType = null,
     realUser: String = null,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/DatabaseType.scala
similarity index 94%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/DatabaseType.scala
index aebbd6eff..ef93f31c5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/DatabaseType.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/DatabaseType.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata.jdbc
 
 object DatabaseType extends Enumeration {
   type DatabaseType = Value
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
similarity index 89%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index 85d5f89bc..025d6316d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata.jdbc
 
 import java.io.{BufferedReader, InputStream, InputStreamReader}
 import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}
@@ -32,18 +32,18 @@ import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
 import org.apache.kyuubi.{KyuubiException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.server.statestore.StateStore
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
-import org.apache.kyuubi.server.statestore.jdbc.DatabaseType._
-import org.apache.kyuubi.server.statestore.jdbc.JDBCStateStoreConf._
+import org.apache.kyuubi.server.metadata.MetadataStore
+import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.server.metadata.jdbc.DatabaseType._
+import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._
 import org.apache.kyuubi.session.SessionType
 import org.apache.kyuubi.session.SessionType.SessionType
 
-class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
-  import JDBCStateStore._
+class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
+  import JDBCMetadataStore._
 
-  private val dbType = DatabaseType.withName(conf.get(SERVER_STATE_STORE_JDBC_DATABASE_TYPE))
-  private val driverClassOpt = conf.get(SERVER_STATE_STORE_JDBC_DRIVER)
+  private val dbType = DatabaseType.withName(conf.get(METADATA_STORE_JDBC_DATABASE_TYPE))
+  private val driverClassOpt = conf.get(METADATA_STORE_JDBC_DRIVER)
   private val driverClass = dbType match {
     case DERBY => driverClassOpt.getOrElse("org.apache.derby.jdbc.AutoloadedDriver")
     case MYSQL => driverClassOpt.getOrElse("com.mysql.jdbc.Driver")
@@ -57,13 +57,14 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     case CUSTOM => new GenericDatabaseDialect
   }
 
-  private val datasourceProperties = JDBCStateStoreConf.getStateStoreJDBCDataSourceProperties(conf)
+  private val datasourceProperties =
+    JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
   private val hikariConfig = new HikariConfig(datasourceProperties)
   hikariConfig.setDriverClassName(driverClass)
-  hikariConfig.setJdbcUrl(conf.get(SERVER_STATE_STORE_JDBC_URL))
-  hikariConfig.setUsername(conf.get(SERVER_STATE_STORE_JDBC_USER))
-  hikariConfig.setPassword(conf.get(SERVER_STATE_STORE_JDBC_PASSWORD))
-  hikariConfig.setPoolName("kyuubi-state-store-pool")
+  hikariConfig.setJdbcUrl(conf.get(METADATA_STORE_JDBC_URL))
+  hikariConfig.setUsername(conf.get(METADATA_STORE_JDBC_USER))
+  hikariConfig.setPassword(conf.get(METADATA_STORE_JDBC_PASSWORD))
+  hikariConfig.setPoolName("jdbc-metadata-store-pool")
 
   @VisibleForTesting
   private[kyuubi] val hikariDataSource = new HikariDataSource(hikariConfig)
@@ -72,7 +73,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
   private val terminalStates =
     OperationState.terminalStates.map(x => s"'${x.toString}'").mkString(", ")
 
-  if (conf.get(SERVER_STATE_STORE_JDBC_DATABASE_SCHEMA_INIT)) {
+  if (conf.get(METADATA_STORE_JDBC_DATABASE_SCHEMA_INIT)) {
     initSchema()
   }
 
@@ -80,9 +81,9 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     val classLoader = getClass.getClassLoader
     val initSchemaStream: Option[InputStream] = dbType match {
       case DERBY =>
-        Option(classLoader.getResourceAsStream("sql/derby/statestore-schema-derby.sql"))
+        Option(classLoader.getResourceAsStream("sql/derby/metadata-store-schema-derby.sql"))
       case MYSQL =>
-        Option(classLoader.getResourceAsStream("sql/mysql/statestore-schema-mysql.sql"))
+        Option(classLoader.getResourceAsStream("sql/mysql/metadata-store-schema-mysql.sql"))
       case CUSTOM => None
     }
     initSchemaStream.foreach { inputStream =>
@@ -107,7 +108,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     hikariDataSource.close()
   }
 
-  override def insertMetadata(metadata: SessionMetadata): Unit = {
+  override def insertMetadata(metadata: Metadata): Unit = {
     val query =
       s"""
          |INSERT INTO $METADATA_TABLE(
@@ -152,7 +153,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     }
   }
 
-  override def getMetadata(identifier: String, stateOnly: Boolean): SessionMetadata = {
+  override def getMetadata(identifier: String, stateOnly: Boolean): Metadata = {
     val query =
       if (stateOnly) {
         s"SELECT $METADATA_STATE_ONLY_COLUMNS FROM $METADATA_TABLE WHERE identifier = ?"
@@ -177,7 +178,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
       endTime: Long,
       from: Int,
       size: Int,
-      stateOnly: Boolean): Seq[SessionMetadata] = {
+      stateOnly: Boolean): Seq[Metadata] = {
     val queryBuilder = new StringBuilder
     val params = ListBuffer[Any]()
     if (stateOnly) {
@@ -227,7 +228,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     }
   }
 
-  override def updateMetadata(metadata: SessionMetadata): Unit = {
+  override def updateMetadata(metadata: Metadata): Unit = {
     val queryBuilder = new StringBuilder
     val params = ListBuffer[Any]()
 
@@ -288,9 +289,9 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
     }
   }
 
-  private def buildMetadata(resultSet: ResultSet, stateOnly: Boolean): Seq[SessionMetadata] = {
+  private def buildMetadata(resultSet: ResultSet, stateOnly: Boolean): Seq[Metadata] = {
     try {
-      val metadataList = ListBuffer[SessionMetadata]()
+      val metadataList = ListBuffer[Metadata]()
       while (resultSet.next()) {
         val identifier = resultSet.getString("identifier")
         val sessionType = SessionType.withName(resultSet.getString("session_type"))
@@ -321,7 +322,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
           requestConf = string2Map(resultSet.getString("request_conf"))
           requestArgs = string2Seq(resultSet.getString("request_args"))
         }
-        val metadata = SessionMetadata(
+        val metadata = Metadata(
           identifier = identifier,
           sessionType = sessionType,
           realUser = realUser,
@@ -360,7 +361,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
       statement.execute()
     } catch {
       case e: SQLException =>
-        throw new KyuubiException(e.getMessage, e)
+        throw new KyuubiException(s"Error executing $sql:" + e.getMessage, e)
     } finally {
       if (statement != null) {
         Utils.tryLogNonFatalError(statement.close())
@@ -418,7 +419,7 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
         throw new KyuubiException(e.getMessage, e)
     } finally {
       if (connection != null) {
-        connection.close()
+        Utils.tryLogNonFatalError(connection.close())
       }
     }
   }
@@ -444,8 +445,8 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
   }
 }
 
-object JDBCStateStore {
-  private val METADATA_TABLE = "session_metadata"
+object JDBCMetadataStore {
+  private val METADATA_TABLE = "metadata"
   private val METADATA_STATE_ONLY_COLUMNS = Seq(
     "identifier",
     "session_type",
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreConf.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
similarity index 56%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreConf.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
index 2c5bdefb8..bbc474c92 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreConf.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala
@@ -15,73 +15,73 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata.jdbc
 
 import java.util.{Locale, Properties}
 
 import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
 
-object JDBCStateStoreConf {
-  final val STATE_STORE_JDBC_DATASOURCE_PREFIX = "kyuubi.server.state.store.jdbc.datasource"
+object JDBCMetadataStoreConf {
+  final val METADATA_STORE_JDBC_DATASOURCE_PREFIX = "kyuubi.metadata.store.jdbc.datasource"
 
   private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key)
 
-  /** Get state store jdbc datasource properties. */
-  def getStateStoreJDBCDataSourceProperties(conf: KyuubiConf): Properties = {
+  /** Get metadata store jdbc datasource properties. */
+  def getMetadataStoreJDBCDataSourceProperties(conf: KyuubiConf): Properties = {
     val datasourceProperties = new Properties()
-    conf.getAllWithPrefix(STATE_STORE_JDBC_DATASOURCE_PREFIX, "").foreach { case (key, value) =>
+    conf.getAllWithPrefix(METADATA_STORE_JDBC_DATASOURCE_PREFIX, "").foreach { case (key, value) =>
       datasourceProperties.put(key, value)
     }
     datasourceProperties
   }
 
-  val SERVER_STATE_STORE_JDBC_DATABASE_TYPE: ConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.jdbc.database.type")
-      .doc("The database type for server jdbc state store.<ul>" +
+  val METADATA_STORE_JDBC_DATABASE_TYPE: ConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.jdbc.database.type")
+      .doc("The database type for server jdbc metadata store.<ul>" +
         " <li>DERBY: Apache Derby, jdbc driver `org.apache.derby.jdbc.AutoloadedDriver`.</li>" +
         " <li>MYSQL: MySQL, jdbc driver `com.mysql.jdbc.Driver`.</li>" +
         " <li>CUSTOM: User-defined database type, need specify the jdbc driver in addition.</li>" +
         " Note that: The jdbc datasource is powered by HiKariCP, for datasource properties," +
-        " please specify them with prefix: kyuubi.server.state.store.jdbc.datasource." +
-        " For example, kyuubi.server.state.store.jdbc.datasource.connectionTimeout=10000.")
+        " please specify them with prefix: kyuubi.server.metadata.store.jdbc.datasource." +
+        " For example, kyuubi.server.metadata.store.jdbc.datasource.connectionTimeout=10000.")
       .version("1.6.0")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .createWithDefault("DERBY")
 
-  val SERVER_STATE_STORE_JDBC_DATABASE_SCHEMA_INIT: ConfigEntry[Boolean] =
-    buildConf("kyuubi.server.state.store.jdbc.database.schema.init")
-      .doc("Whether to init the jdbc state store database schema.")
+  val METADATA_STORE_JDBC_DATABASE_SCHEMA_INIT: ConfigEntry[Boolean] =
+    buildConf("kyuubi.metadata.store.jdbc.database.schema.init")
+      .doc("Whether to init the jdbc metadata store database schema.")
       .version("1.6.0")
       .booleanConf
       .createWithDefault(true)
 
-  val SERVER_STATE_STORE_JDBC_DRIVER: OptionalConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.jdbc.driver")
-      .doc("JDBC driver class name for server jdbc state store.")
+  val METADATA_STORE_JDBC_DRIVER: OptionalConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.jdbc.driver")
+      .doc("JDBC driver class name for server jdbc metadata store.")
       .version("1.6.0")
       .stringConf
       .createOptional
 
-  val SERVER_STATE_STORE_JDBC_URL: ConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.jdbc.url")
-      .doc("The jdbc url for server jdbc state store. By defaults, it is a DERBY in-memory" +
+  val METADATA_STORE_JDBC_URL: ConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.jdbc.url")
+      .doc("The jdbc url for server jdbc metadata store. By defaults, it is a DERBY in-memory" +
         " database url, and the state information is not shared across kyuubi instances. To" +
         " enable multiple kyuubi instances high available, please specify a production jdbc url.")
       .version("1.6.0")
       .stringConf
       .createWithDefault("jdbc:derby:memory:kyuubi_state_store_db;create=true")
 
-  val SERVER_STATE_STORE_JDBC_USER: ConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.jdbc.user")
-      .doc("The username for server jdbc state store.")
+  val METADATA_STORE_JDBC_USER: ConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.jdbc.user")
+      .doc("The username for server jdbc metadata store.")
       .version("1.6.0")
       .stringConf
       .createWithDefault("")
 
-  val SERVER_STATE_STORE_JDBC_PASSWORD: ConfigEntry[String] =
-    buildConf("kyuubi.server.state.store.jdbc.password")
-      .doc("The password for server jdbc state store.")
+  val METADATA_STORE_JDBC_PASSWORD: ConfigEntry[String] =
+    buildConf("kyuubi.metadata.store.jdbc.password")
+      .doc("The password for server jdbc metadata store.")
       .version("1.6.0")
       .stringConf
       .createWithDefault("")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
similarity index 96%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JdbcDatabaseDialect.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
index f51736843..837af77cf 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JdbcDatabaseDialect.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata.jdbc
 
 trait JdbcDatabaseDialect {
   def addLimitAndOffsetToQuery(sql: String, limit: Int, offset: Int): String
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
deleted file mode 100644
index 2ea336f0d..000000000
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.server.statestore
-
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import org.apache.kyuubi.{KyuubiException, Logging}
-import org.apache.kyuubi.client.api.v1.dto.Batch
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.SERVER_STATE_STORE_MAX_AGE
-import org.apache.kyuubi.engine.ApplicationOperation._
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
-import org.apache.kyuubi.service.AbstractService
-import org.apache.kyuubi.session.SessionType
-import org.apache.kyuubi.util.{ClassUtils, ThreadUtils}
-
-class SessionStateStore extends AbstractService("SessionStateStore") {
-  private var _stateStore: StateStore = _
-
-  private val stateStoreCleaner =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("session-state-store-cleaner")
-
-  override def initialize(conf: KyuubiConf): Unit = {
-    this.conf = conf
-    _stateStore = SessionStateStore.createStateStore(conf)
-    super.initialize(conf)
-  }
-
-  override def start(): Unit = {
-    super.start()
-    startStateStoreCleaner()
-  }
-
-  override def stop(): Unit = {
-    ThreadUtils.shutdown(stateStoreCleaner)
-    _stateStore.close()
-    super.stop()
-  }
-
-  def insertMetadata(metadata: SessionMetadata): Unit = {
-    _stateStore.insertMetadata(metadata)
-  }
-
-  def getBatch(batchId: String): Batch = {
-    Option(getBatchSessionMetadata(batchId)).map(buildBatch).orNull
-  }
-
-  def getBatchSessionMetadata(batchId: String): SessionMetadata = {
-    Option(_stateStore.getMetadata(batchId, true)).filter(_.sessionType == SessionType.BATCH).orNull
-  }
-
-  def getBatches(
-      batchType: String,
-      batchUser: String,
-      batchState: String,
-      createTime: Long,
-      endTime: Long,
-      from: Int,
-      size: Int): Seq[Batch] = {
-    _stateStore.getMetadataList(
-      SessionType.BATCH,
-      batchType,
-      batchUser,
-      batchState,
-      null,
-      createTime,
-      endTime,
-      from,
-      size,
-      true).map(buildBatch)
-  }
-
-  def getBatchesRecoveryMetadata(
-      state: String,
-      kyuubiInstance: String,
-      from: Int,
-      size: Int): Seq[SessionMetadata] = {
-    _stateStore.getMetadataList(
-      SessionType.BATCH,
-      null,
-      null,
-      state,
-      kyuubiInstance,
-      0,
-      0,
-      from,
-      size,
-      false)
-  }
-
-  def updateBatchMetadata(
-      batchId: String,
-      state: String,
-      batchAppStatus: Map[String, String],
-      endTime: Long): Unit = {
-    val appId = batchAppStatus.get(APP_ID_KEY).orNull
-    val appName = batchAppStatus.get(APP_NAME_KEY).orNull
-    val appUrl = batchAppStatus.get(APP_URL_KEY).orNull
-    val appState = batchAppStatus.get(APP_STATE_KEY).orNull
-    val appError = batchAppStatus.get(APP_ERROR_KEY)
-    val metadata = SessionMetadata(
-      identifier = batchId,
-      state = state,
-      engineId = appId,
-      engineName = appName,
-      engineUrl = appUrl,
-      engineState = appState,
-      engineError = appError,
-      endTime = endTime)
-    _stateStore.updateMetadata(metadata)
-  }
-
-  def cleanupMetadataById(batchId: String): Unit = {
-    _stateStore.cleanupMetadataByIdentifier(batchId)
-  }
-
-  private def buildBatch(batchMetadata: SessionMetadata): Batch = {
-    val batchAppInfo = Map(
-      APP_ID_KEY -> Option(batchMetadata.engineId),
-      APP_NAME_KEY -> Option(batchMetadata.engineName),
-      APP_STATE_KEY -> Option(batchMetadata.engineState),
-      APP_URL_KEY -> Option(batchMetadata.engineUrl),
-      APP_ERROR_KEY -> batchMetadata.engineError)
-      .filter(_._2.isDefined)
-      .map(info => (info._1, info._2.get))
-
-    new Batch(
-      batchMetadata.identifier,
-      batchMetadata.username,
-      batchMetadata.engineType,
-      batchMetadata.requestName,
-      batchAppInfo.asJava,
-      batchMetadata.kyuubiInstance,
-      batchMetadata.state,
-      batchMetadata.createTime,
-      batchMetadata.endTime)
-  }
-
-  private def startStateStoreCleaner(): Unit = {
-    val cleanerEnabled = conf.get(KyuubiConf.SERVER_STATE_STORE_CLEANER_ENABLED)
-    val stateMaxAge = conf.get(SERVER_STATE_STORE_MAX_AGE)
-
-    if (cleanerEnabled) {
-      val interval = conf.get(KyuubiConf.SERVER_STATE_STORE_CLEANER_INTERVAL)
-      val cleanerTask: Runnable = () => {
-        try {
-          _stateStore.cleanupMetadataByAge(stateMaxAge)
-        } catch {
-          case e: Throwable => error("Error cleaning up the metadata by age", e)
-        }
-      }
-
-      stateStoreCleaner.scheduleWithFixedDelay(
-        cleanerTask,
-        interval,
-        interval,
-        TimeUnit.MILLISECONDS)
-    }
-  }
-}
-
-object SessionStateStore extends Logging {
-  def createStateStore(conf: KyuubiConf): StateStore = {
-    val className = conf.get(KyuubiConf.SERVER_STATE_STORE_CLASS)
-    if (className.isEmpty) {
-      throw new KyuubiException(
-        s"${KyuubiConf.SERVER_STATE_STORE_CLASS.key} cannot be empty.")
-    }
-    ClassUtils.createInstance(className, classOf[StateStore], conf)
-  }
-}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 47b96a80e..d214d5f17 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.KyuubiRestFrontendService
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType.SessionType
 
 class KyuubiBatchSessionImpl(
@@ -42,7 +42,7 @@ class KyuubiBatchSessionImpl(
     override val sessionManager: KyuubiSessionManager,
     val sessionConf: KyuubiConf,
     batchRequest: BatchRequest,
-    recoveryMetadata: Option[SessionMetadata] = None)
+    recoveryMetadata: Option[Metadata] = None)
   extends KyuubiSession(
     TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
     user,
@@ -75,6 +75,18 @@ class KyuubiBatchSessionImpl(
       batchRequest.getArgs.asScala,
       recoveryMetadata)
 
+  private def waitMetadataRequestsRetryCompletion(): Unit = {
+    val batchId = batchJobSubmissionOp.batchId
+    sessionManager.getMetadataRequestsRetryRef(batchId).foreach {
+      metadataRequestsRetryRef =>
+        while (metadataRequestsRetryRef.hasRemainingRequests()) {
+          info(s"There are still remaining metadata store requests for batch[$batchId]")
+          Thread.sleep(300)
+        }
+        sessionManager.deRegisterMetadataRequestsRetryRef(batchId)
+    }
+  }
+
   private val sessionEvent = KyuubiSessionEvent(this)
   EventBus.post(sessionEvent)
 
@@ -89,7 +101,7 @@ class KyuubiBatchSessionImpl(
     }
 
     if (recoveryMetadata.isEmpty) {
-      val metaData = SessionMetadata(
+      val metaData = Metadata(
         identifier = handle.identifier.toString,
         sessionType = sessionType,
         // TODO: support real user
@@ -119,6 +131,7 @@ class KyuubiBatchSessionImpl(
 
   override def close(): Unit = {
     super.close()
+    waitMetadataRequestsRetryCompletion()
     sessionEvent.endTime = System.currentTimeMillis()
     EventBus.post(sessionEvent)
     MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 94c3e9b35..ce11a0ad7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -18,7 +18,6 @@
 package org.apache.kyuubi.session
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
@@ -33,10 +32,9 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
-import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}
-import org.apache.kyuubi.server.statestore.SessionStateStore
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef}
+import org.apache.kyuubi.server.metadata.api.Metadata
 
 class KyuubiSessionManager private (name: String) extends SessionManager(name) {
 
@@ -47,14 +45,14 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
   // this lazy is must be specified since the conf is null when the class initialization
   lazy val sessionConfAdvisor: SessionConfAdvisor = PluginLoader.loadSessionConfAdvisor(conf)
   val applicationManager = new KyuubiApplicationManager()
-  private lazy val sessionStateStore = new SessionStateStore()
+  private val metadataManager = new MetadataManager()
 
   private var limiter: Option[SessionLimiter] = None
 
   override def initialize(conf: KyuubiConf): Unit = {
     addService(applicationManager)
     addService(credentialsManager)
-    addService(sessionStateStore)
+    addService(metadataManager)
     initSessionLimiter(conf)
     super.initialize(conf)
   }
@@ -114,7 +112,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
       ipAddress: String,
       conf: Map[String, String],
       batchRequest: BatchRequest,
-      recoveryMetadata: Option[SessionMetadata] = None): KyuubiBatchSessionImpl = {
+      recoveryMetadata: Option[Metadata] = None): KyuubiBatchSessionImpl = {
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
     new KyuubiBatchSessionImpl(
       username,
@@ -169,23 +167,27 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
     getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSessionImpl]).orNull
   }
 
-  def insertMetadata(metadata: SessionMetadata): Unit = {
-    sessionStateStore.insertMetadata(metadata)
+  def insertMetadata(metadata: Metadata): Unit = {
+    metadataManager.insertMetadata(metadata)
   }
 
-  def updateBatchMetadata(
-      batchId: String,
-      state: OperationState,
-      applicationStatus: Map[String, String],
-      endTime: Long = 0L): Unit = {
-    sessionStateStore.updateBatchMetadata(batchId, state.toString, applicationStatus, endTime)
+  def updateMetadata(metadata: Metadata): Unit = {
+    metadataManager.updateMetadata(metadata)
   }
 
-  def getBatchFromStateStore(batchId: String): Batch = {
-    sessionStateStore.getBatch(batchId)
+  def getMetadataRequestsRetryRef(identifier: String): Option[MetadataRequestsRetryRef] = {
+    Option(metadataManager.getMetadataRequestsRetryRef(identifier))
   }
 
-  def getBatchesFromStateStore(
+  def deRegisterMetadataRequestsRetryRef(identifier: String): Unit = {
+    metadataManager.deRegisterRequestsRetryRef(identifier)
+  }
+
+  def getBatchFromMetadataStore(batchId: String): Batch = {
+    metadataManager.getBatch(batchId)
+  }
+
+  def getBatchesFromMetadataStore(
       batchType: String,
       batchUser: String,
       batchState: String,
@@ -193,16 +195,16 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
       endTime: Long,
       from: Int,
       size: Int): Seq[Batch] = {
-    sessionStateStore.getBatches(batchType, batchUser, batchState, createTime, endTime, from, size)
+    metadataManager.getBatches(batchType, batchUser, batchState, createTime, endTime, from, size)
   }
 
-  def getBatchSessionMetadata(batchId: String): SessionMetadata = {
-    sessionStateStore.getBatchSessionMetadata(batchId)
+  def getBatchMetadata(batchId: String): Metadata = {
+    metadataManager.getBatchSessionMetadata(batchId)
   }
 
   @VisibleForTesting
   def cleanupMetadata(identifier: String): Unit = {
-    sessionStateStore.cleanupMetadataById(identifier)
+    metadataManager.cleanupMetadataById(identifier)
   }
 
   override def start(): Unit = synchronized {
@@ -215,43 +217,29 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
   }
 
   def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSessionImpl] = {
-    val recoveryPerBatch = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH)
-
-    val batchSessionsToRecover = ListBuffer[KyuubiBatchSessionImpl]()
-    Seq(OperationState.PENDING, OperationState.RUNNING).foreach { stateToRecover =>
-      var offset = 0
-      var lastRecoveryNum = Int.MaxValue
-
-      while (lastRecoveryNum >= recoveryPerBatch) {
-        val metadataList = sessionStateStore.getBatchesRecoveryMetadata(
-          stateToRecover.toString,
-          kyuubiInstance,
-          offset,
-          recoveryPerBatch)
-        metadataList.foreach { metadata =>
-          val batchRequest = new BatchRequest(
-            metadata.engineType,
-            metadata.resource,
-            metadata.className,
-            metadata.requestName,
-            metadata.requestConf.asJava,
-            metadata.requestArgs.asJava)
-
-          val batchSession = createBatchSession(
-            metadata.username,
-            "anonymous",
-            metadata.ipAddress,
-            metadata.requestConf,
-            batchRequest,
-            Some(metadata))
-          batchSessionsToRecover += batchSession
-        }
-
-        lastRecoveryNum = metadataList.size
-        offset += lastRecoveryNum
+    Seq(OperationState.PENDING, OperationState.RUNNING).flatMap { stateToRecover =>
+      metadataManager.getBatchesRecoveryMetadata(
+        stateToRecover.toString,
+        kyuubiInstance,
+        0,
+        Int.MaxValue).map { metadata =>
+        val batchRequest = new BatchRequest(
+          metadata.engineType,
+          metadata.resource,
+          metadata.className,
+          metadata.requestName,
+          metadata.requestConf.asJava,
+          metadata.requestArgs.asJava)
+
+        createBatchSession(
+          metadata.username,
+          "anonymous",
+          metadata.ipAddress,
+          metadata.requestConf,
+          batchRequest,
+          Some(metadata))
       }
     }
-    batchSessionsToRecover
   }
 
   override protected def isServer: Boolean = true
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
index 8317837cb..d33df3a81 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, TestUtils, Utils}
 import org.apache.kyuubi.ctl.CtlConf
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.metrics.MetricsConf
-import org.apache.kyuubi.server.statestore.jdbc.JDBCStateStoreConf
+import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
 import org.apache.kyuubi.zookeeper.ZookeeperConf
 
 // scalastyle:off line.size.limit
@@ -72,7 +72,7 @@ class AllKyuubiConfiguration extends KyuubiFunSuite {
     KyuubiConf
     CtlConf
     HighAvailabilityConf
-    JDBCStateStoreConf
+    JDBCMetadataStoreConf
     MetricsConf
     ZookeeperConf
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index d156bb95a..7c5a1aad1 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -32,11 +32,12 @@ import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY}
 import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder}
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.KyuubiRestFrontendService
 import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
+import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.service.authentication.{KyuubiAuthenticationFactory, UserDefinedEngineSecuritySecretProvider}
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle, SessionType}
 
@@ -54,7 +55,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     sessionManager.allSessions().foreach { session =>
       sessionManager.closeSession(session.handle)
     }
-    sessionManager.getBatchesFromStateStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
+    sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
       batch =>
         sessionManager.applicationManager.killApplication(None, batch.getId)
         sessionManager.cleanupMetadata(batch.getId)
@@ -381,7 +382,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     val batchId1 = UUID.randomUUID().toString
     val batchId2 = UUID.randomUUID().toString
 
-    val batchMetadata = SessionMetadata(
+    val batchMetadata = Metadata(
       identifier = batchId1,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
@@ -406,8 +407,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     sessionManager.insertMetadata(batchMetadata)
     sessionManager.insertMetadata(batchMetadata2)
 
-    assert(sessionManager.getBatchFromStateStore(batchId1).getState.equals("PENDING"))
-    assert(sessionManager.getBatchFromStateStore(batchId2).getState.equals("PENDING"))
+    assert(sessionManager.getBatchFromMetadataStore(batchId1).getState.equals("PENDING"))
+    assert(sessionManager.getBatchFromMetadataStore(batchId2).getState.equals("PENDING"))
 
     val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
       "kyuubi",
@@ -427,10 +428,15 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
       assert(applicationStatus.isDefined)
     }
 
-    sessionManager.updateBatchMetadata(
-      batchId2,
-      OperationState.RUNNING,
-      applicationStatus.get)
+    val metadataToUpdate = Metadata(
+      identifier = batchId2,
+      state = OperationState.RUNNING.toString,
+      engineId = applicationStatus.get.get(APP_ID_KEY).orNull,
+      engineName = applicationStatus.get.get(APP_NAME_KEY).orNull,
+      engineUrl = applicationStatus.get.get(APP_URL_KEY).orNull,
+      engineState = applicationStatus.get.get(APP_STATE_KEY).orNull,
+      engineError = applicationStatus.get.get(APP_ERROR_KEY))
+    sessionManager.updateMetadata(metadataToUpdate)
 
     val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
     restFe.recoverBatchSessions()
@@ -451,7 +457,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
       assert(!session2.batchJobSubmissionOp.builder.processLaunched)
     }
 
-    assert(sessionManager.getBatchesFromStateStore(
+    assert(sessionManager.getBatchesFromMetadataStore(
       "SPARK",
       null,
       null,
@@ -463,7 +469,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
 
   test("get local log internal redirection") {
     val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
-    val metadata = SessionMetadata(
+    val metadata = Metadata(
       identifier = UUID.randomUUID().toString,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
@@ -513,7 +519,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
 
   test("delete batch internal redirection") {
     val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
-    val metadata = SessionMetadata(
+    val metadata = Metadata(
       identifier = UUID.randomUUID().toString,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
new file mode 100644
index 000000000..400b3766e
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.metadata
+
+import java.util.UUID
+
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.session.SessionType
+
+class MetadataManagerSuite extends KyuubiFunSuite {
+  val metadataManager = new MetadataManager()
+  val conf = KyuubiConf().set(KyuubiConf.METADATA_REQUEST_RETRY_INTERVAL, 100L)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    metadataManager.initialize(conf)
+    metadataManager.start()
+  }
+
+  override def afterAll(): Unit = {
+    metadataManager.getBatches(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch =>
+      metadataManager.cleanupMetadataById(batch.getId)
+    }
+    metadataManager.stop()
+    super.afterAll()
+  }
+
+  test("retry the metadata store requests") {
+    val metadata = Metadata(
+      identifier = UUID.randomUUID().toString,
+      sessionType = SessionType.BATCH,
+      realUser = "kyuubi",
+      username = "kyuubi",
+      ipAddress = "127.0.0.1",
+      kyuubiInstance = "localhost:10009",
+      state = "PENDING",
+      resource = "intern",
+      className = "org.apache.kyuubi.SparkWC",
+      requestName = "kyuubi_batch",
+      requestConf = Map("spark.master" -> "local"),
+      requestArgs = Seq("100"),
+      createTime = System.currentTimeMillis(),
+      engineType = "spark",
+      clusterManager = Some("local"))
+    metadataManager.insertMetadata(metadata)
+    intercept[KyuubiException] {
+      metadataManager.insertMetadata(metadata, retryOnError = false)
+    }
+    metadataManager.insertMetadata(metadata, retryOnError = true)
+    val retryRef = metadataManager.getMetadataRequestsRetryRef(metadata.identifier)
+    val metadataToUpdate = metadata.copy(state = "RUNNING")
+    retryRef.addRetryingMetadataRequest(UpdateMetadata(metadataToUpdate))
+    eventually(timeout(3.seconds)) {
+      assert(retryRef.hasRemainingRequests())
+      assert(metadataManager.getBatch(metadata.identifier).getState === "PENDING")
+    }
+
+    val metadata2 = metadata.copy(identifier = UUID.randomUUID().toString)
+    val metadata2ToUpdate = metadata2.copy(
+      engineId = "app_id",
+      engineName = "app_name",
+      engineUrl = "app_url",
+      engineState = "app_state",
+      state = "RUNNING")
+
+    metadataManager.addMetadataRetryRequest(InsertMetadata(metadata2))
+    metadataManager.addMetadataRetryRequest(UpdateMetadata(metadata2ToUpdate))
+
+    val retryRef2 = metadataManager.getMetadataRequestsRetryRef(metadata2.identifier)
+
+    eventually(timeout(3.seconds)) {
+      assert(!retryRef2.hasRemainingRequests())
+      assert(metadataManager.getBatch(metadata2.identifier).getState === "RUNNING")
+    }
+  }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
similarity index 63%
rename from kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala
rename to kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
index 9192f95de..7e81931f2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.server.statestore.jdbc
+package org.apache.kyuubi.server.metadata.jdbc
 
 import java.util.UUID
 
@@ -24,22 +24,22 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.server.statestore.api.SessionMetadata
-import org.apache.kyuubi.server.statestore.jdbc.JDBCStateStoreConf._
+import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._
 import org.apache.kyuubi.session.SessionType
 
-class JDBCStateStoreSuite extends KyuubiFunSuite {
+class JDBCMetadataStoreSuite extends KyuubiFunSuite {
   private val conf = KyuubiConf()
-    .set(SERVER_STATE_STORE_JDBC_DATABASE_TYPE, DatabaseType.DERBY.toString)
-    .set(SERVER_STATE_STORE_JDBC_DATABASE_SCHEMA_INIT, true)
-    .set(s"$STATE_STORE_JDBC_DATASOURCE_PREFIX.connectionTimeout", "3000")
-    .set(s"$STATE_STORE_JDBC_DATASOURCE_PREFIX.maximumPoolSize", "99")
-    .set(s"$STATE_STORE_JDBC_DATASOURCE_PREFIX.idleTimeout", "60000")
-  private val jdbcStateStore = new JDBCStateStore(conf)
+    .set(METADATA_STORE_JDBC_DATABASE_TYPE, DatabaseType.DERBY.toString)
+    .set(METADATA_STORE_JDBC_DATABASE_SCHEMA_INIT, true)
+    .set(s"$METADATA_STORE_JDBC_DATASOURCE_PREFIX.connectionTimeout", "3000")
+    .set(s"$METADATA_STORE_JDBC_DATASOURCE_PREFIX.maximumPoolSize", "99")
+    .set(s"$METADATA_STORE_JDBC_DATASOURCE_PREFIX.idleTimeout", "60000")
+  private val jdbcMetadataStore = new JDBCMetadataStore(conf)
 
   override def afterAll(): Unit = {
     super.afterAll()
-    jdbcStateStore.getMetadataList(
+    jdbcMetadataStore.getMetadataList(
       null,
       null,
       null,
@@ -51,21 +51,21 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       Int.MaxValue,
       true).foreach {
       batch =>
-        jdbcStateStore.cleanupMetadataByIdentifier(batch.identifier)
+        jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier)
     }
-    jdbcStateStore.close()
+    jdbcMetadataStore.close()
   }
 
   test("test jdbc datasource properties") {
-    assert(jdbcStateStore.hikariDataSource.getConnectionTimeout == 3000)
-    assert(jdbcStateStore.hikariDataSource.getMaximumPoolSize == 99)
-    assert(jdbcStateStore.hikariDataSource.getIdleTimeout == 60000)
+    assert(jdbcMetadataStore.hikariDataSource.getConnectionTimeout == 3000)
+    assert(jdbcMetadataStore.hikariDataSource.getMaximumPoolSize == 99)
+    assert(jdbcMetadataStore.hikariDataSource.getIdleTimeout == 60000)
   }
 
-  test("jdbc state store") {
+  test("jdbc metadata store") {
     val batchId = UUID.randomUUID().toString
     val kyuubiInstance = "localhost:10099"
-    var batchMetadata = SessionMetadata(
+    var batchMetadata = Metadata(
       identifier = batchId,
       sessionType = SessionType.BATCH,
       realUser = "kyuubi",
@@ -88,29 +88,39 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       requestConf = Map.empty,
       requestArgs = Seq.empty)
 
-    jdbcStateStore.insertMetadata(batchMetadata)
-    assert(jdbcStateStore.getMetadata(batchId, true) != batchStateOnlyMetadata)
-    assert(jdbcStateStore.getMetadata(batchId, false) != batchMetadata)
+    jdbcMetadataStore.insertMetadata(batchMetadata)
+    assert(jdbcMetadataStore.getMetadata(batchId, true) != batchStateOnlyMetadata)
+    assert(jdbcMetadataStore.getMetadata(batchId, false) != batchMetadata)
 
     // the engine type is formatted with UPPER
     batchMetadata = batchMetadata.copy(engineType = "SPARK")
     batchStateOnlyMetadata = batchStateOnlyMetadata.copy(engineType = "SPARK")
-    assert(jdbcStateStore.getMetadata(batchId, true) == batchStateOnlyMetadata)
-    assert(jdbcStateStore.getMetadata(batchId, false) == batchMetadata)
+    assert(jdbcMetadataStore.getMetadata(batchId, true) == batchStateOnlyMetadata)
+    assert(jdbcMetadataStore.getMetadata(batchId, false) == batchMetadata)
 
-    jdbcStateStore.cleanupMetadataByIdentifier(batchId)
-    assert(jdbcStateStore.getMetadata(batchId, true) == null)
+    jdbcMetadataStore.cleanupMetadataByIdentifier(batchId)
+    assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
 
-    jdbcStateStore.insertMetadata(batchMetadata)
+    jdbcMetadataStore.insertMetadata(batchMetadata)
 
     val batchState2 = batchStateOnlyMetadata.copy(identifier = UUID.randomUUID().toString)
-    jdbcStateStore.insertMetadata(batchState2)
+    jdbcMetadataStore.insertMetadata(batchState2)
 
     var batches =
-      jdbcStateStore.getMetadataList(SessionType.BATCH, "Spark", null, null, null, 0, 0, 0, 1, true)
+      jdbcMetadataStore.getMetadataList(
+        SessionType.BATCH,
+        "Spark",
+        null,
+        null,
+        null,
+        0,
+        0,
+        0,
+        1,
+        true)
     assert(batches == Seq(batchStateOnlyMetadata))
 
-    batches = jdbcStateStore.getMetadataList(
+    batches = jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       "spark",
       "kyuubi",
@@ -123,10 +133,10 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       true)
     assert(batches == Seq(batchStateOnlyMetadata, batchState2))
 
-    jdbcStateStore.cleanupMetadataByIdentifier(batchState2.identifier)
+    jdbcMetadataStore.cleanupMetadataByIdentifier(batchState2.identifier)
 
     batches =
-      jdbcStateStore.getMetadataList(
+      jdbcMetadataStore.getMetadataList(
         SessionType.SQL,
         "SPARK",
         "kyuubi",
@@ -140,7 +150,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
     assert(batches.isEmpty)
 
     batches =
-      jdbcStateStore.getMetadataList(
+      jdbcMetadataStore.getMetadataList(
         SessionType.BATCH,
         "SPARK",
         "kyuubi",
@@ -154,7 +164,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
     assert(batches == Seq(batchStateOnlyMetadata))
 
     batches =
-      jdbcStateStore.getMetadataList(
+      jdbcMetadataStore.getMetadataList(
         SessionType.BATCH,
         "SPARK",
         "kyuubi",
@@ -168,7 +178,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
     assert(batches.isEmpty)
 
     batches =
-      jdbcStateStore.getMetadataList(
+      jdbcMetadataStore.getMetadataList(
         SessionType.BATCH,
         "SPARK",
         "no_kyuubi",
@@ -181,7 +191,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
         true)
     assert(batches.isEmpty)
 
-    batches = jdbcStateStore.getMetadataList(
+    batches = jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       "SPARK",
       null,
@@ -194,7 +204,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       true)
     assert(batches == Seq(batchStateOnlyMetadata))
 
-    batches = jdbcStateStore.getMetadataList(
+    batches = jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       null,
       null,
@@ -207,7 +217,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       true)
     assert(batches == Seq(batchStateOnlyMetadata))
 
-    var batchesToRecover = jdbcStateStore.getMetadataList(
+    var batchesToRecover = jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       null,
       null,
@@ -220,7 +230,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       false)
     assert(batchesToRecover == Seq(batchMetadata))
 
-    batchesToRecover = jdbcStateStore.getMetadataList(
+    batchesToRecover = jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       null,
       null,
@@ -240,15 +250,15 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       engineUrl = "app_url",
       engineState = "RUNNING",
       engineError = None)
-    jdbcStateStore.updateMetadata(newBatchState)
-    assert(jdbcStateStore.getMetadata(batchId, true) == newBatchState)
+    jdbcMetadataStore.updateMetadata(newBatchState)
+    assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
 
     newBatchState = newBatchState.copy(state = "FINISHED", endTime = System.currentTimeMillis())
-    jdbcStateStore.updateMetadata(newBatchState)
+    jdbcMetadataStore.updateMetadata(newBatchState)
 
-    assert(jdbcStateStore.getMetadata(batchId, true) == newBatchState)
+    assert(jdbcMetadataStore.getMetadata(batchId, true) == newBatchState)
 
-    assert(jdbcStateStore.getMetadataList(
+    assert(jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       null,
       null,
@@ -260,7 +270,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       Int.MaxValue,
       false).isEmpty)
 
-    assert(jdbcStateStore.getMetadataList(
+    assert(jdbcMetadataStore.getMetadataList(
       SessionType.BATCH,
       null,
       null,
@@ -273,8 +283,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite {
       false).isEmpty)
 
     eventually(Timeout(3.seconds)) {
-      jdbcStateStore.cleanupMetadataByAge(1000)
-      assert(jdbcStateStore.getMetadata(batchId, true) == null)
+      jdbcMetadataStore.cleanupMetadataByAge(1000)
+      assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
     }
   }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index de662e5e8..948224e0b 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -69,7 +69,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
     sessionManager.allSessions().foreach { session =>
       sessionManager.closeSession(session.handle)
     }
-    sessionManager.getBatchesFromStateStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
+    sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
       batch =>
         sessionManager.applicationManager.killApplication(None, batch.getId)
         sessionManager.cleanupMetadata(batch.getId)