You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/13 03:32:22 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1989] Decouple curator from other modules

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91010689d [KYUUBI #1989] Decouple curator from other modules
91010689d is described below

commit 91010689dbe9dac37c58fbf57eab40a9068c3dfc
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Wed Apr 13 11:32:13 2022 +0800

    [KYUUBI #1989] Decouple curator from other modules
    
    ### _Why are the changes needed?_
    
    Decouple curator from other modules.
    
    Mainly changes:
    1. `ZookeeperClientProvider` -> `zookeeper.ClientProvider`
    2. `zkClient` apis -> `DiscoveryClient`
    3. `ZooKeeperClientProvider.withZkClient` -> `DiscoveryClientProvider.withDiscoveryClient`
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1991 from hddong/decuple-discovery.
    
    Closes #1989
    
    3efcbbff [hongdongdong] fix
    2233acc2 [hongdongdong] [KYUUBI #1989] Decouple curator from other modules
    
    Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../engine/spark/ShareLevelSparkEngineSuite.scala  |  16 +-
 .../engine/spark/WithDiscoverySparkSQLEngine.scala |  12 +-
 .../org/apache/kyuubi/ctl/ServiceControlCli.scala  |  45 ++-
 .../apache/kyuubi/ctl/ServiceControlCliSuite.scala |  42 ++-
 .../apache/kyuubi/ha/HighAvailabilityConf.scala    |  11 +-
 .../{ZooKeeperAuthTypes.scala => AuthTypes.scala}  |   4 +-
 .../apache/kyuubi/ha/client/DiscoveryClient.scala  | 192 +++++++++++
 ...thTypes.scala => DiscoveryClientProvider.scala} |  25 +-
 ...oKeeperAuthTypes.scala => DiscoveryPaths.scala} |  12 +-
 .../kyuubi/ha/client/EngineServiceDiscovery.scala  |   2 +-
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala |  97 +-----
 .../client/zookeeper/ServiceDiscoveryClient.scala  | 251 --------------
 .../ZookeeperACLProvider.scala}                    |  13 +-
 .../ZookeeperClientProvider.scala}                 |  36 +-
 .../zookeeper/ZookeeperDiscoveryClient.scala       | 380 +++++++++++++++++++++
 ...ZooKeeperEngineSecuritySecretProviderImpl.scala |   8 +-
 .../ha/client/DiscoveryClientProviderSuite.scala}  |  17 +-
 .../ZookeeperClientProviderSuite.scala}            |  19 +-
 .../ZookeeperDiscoveryClientSuite.scala}           |  57 ++--
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  64 ++--
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  |   6 +-
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  22 +-
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |   6 +-
 .../scala/org/apache/kyuubi/WithKyuubiServer.scala |   4 +-
 .../org/apache/kyuubi/engine/EngineRefSuite.scala  |  47 ++-
 .../engine/spark/SparkProcessBuilderSuite.scala    |   4 +-
 .../KyuubiOperationWithEngineSecurity.scala        |  21 +-
 .../apache/kyuubi/server/KyuubiServerSuite.scala   |  10 +-
 28 files changed, 840 insertions(+), 583 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
index 47ed53ce4..b75800d5f 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
@@ -50,36 +50,36 @@ abstract class ShareLevelSparkEngineSuite
   }
 
   test("check discovery service is clean up with different share level") {
-    withZkClient { zkClient =>
+    withDiscoveryClient { discoveryClient =>
       assert(engine.getServiceState == ServiceState.STARTED)
-      assert(zkClient.checkExists().forPath(namespace) != null)
+      assert(discoveryClient.pathExists(namespace))
       withJdbcStatement() { _ => }
       shareLevel match {
         // Connection level, we will cleanup namespace since it's always a global unique value.
         case ShareLevel.CONNECTION =>
           assert(engine.getServiceState == ServiceState.STOPPED)
-          assert(zkClient.checkExists().forPath(namespace) == null)
+          assert(discoveryClient.pathNonExists(namespace))
         case _ =>
           assert(engine.getServiceState == ServiceState.STARTED)
-          assert(zkClient.checkExists().forPath(namespace) != null)
+          assert(discoveryClient.pathExists(namespace))
       }
     }
   }
 
   test("test spark engine max life-time") {
-    withZkClient { zkClient =>
+    withDiscoveryClient { discoveryClient =>
       assert(engine.getServiceState == ServiceState.STARTED)
-      assert(zkClient.checkExists().forPath(namespace) != null)
+      assert(discoveryClient.pathExists(namespace))
       withJdbcStatement() { _ => }
 
       eventually(Timeout(30.seconds)) {
         shareLevel match {
           case ShareLevel.CONNECTION =>
             assert(engine.getServiceState == ServiceState.STOPPED)
-            assert(zkClient.checkExists().forPath(namespace) == null)
+            assert(discoveryClient.pathNonExists(namespace))
           case _ =>
             assert(engine.getServiceState == ServiceState.STOPPED)
-            assert(zkClient.checkExists().forPath(namespace) != null)
+            assert(discoveryClient.pathExists(namespace))
         }
       }
     }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
index 481b8ec7e..fa52b8c7c 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
@@ -17,12 +17,12 @@
 
 package org.apache.kyuubi.engine.spark
 
-import org.apache.curator.framework.CuratorFramework
-
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
 trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
@@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
     assert(zkServer != null)
     Map(
       HA_ZK_QUORUM.key -> zkServer.getConnectString,
-      HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString,
+      HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
       HA_ZK_NAMESPACE.key -> namespace)
   }
 
@@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
     stopSparkEngine()
   }
 
-  def withZkClient(f: CuratorFramework => Unit): Unit = {
-    ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
+  def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
+    DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
   }
 
   protected def getDiscoveryConnectionString: String = {
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
index 44da3cda4..2585989a2 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
@@ -19,15 +19,14 @@ package org.apache.kyuubi.ctl
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.utils.ZKPaths
-
 import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
 
 private[ctl] object ServiceControlAction extends Enumeration {
   type ServiceControlAction = Value
@@ -43,9 +42,8 @@ private[ctl] object ServiceControlObject extends Enumeration {
  * Main gateway of launching a Kyuubi Ctl action.
  */
 private[kyuubi] class ServiceControlCli extends Logging {
+  import DiscoveryClientProvider._
   import ServiceControlCli._
-  import ServiceDiscovery._
-  import ZooKeeperClientProvider._
 
   private var verbose: Boolean = false
 
@@ -84,21 +82,20 @@ private[kyuubi] class ServiceControlCli extends Logging {
     val kyuubiConf = args.conf
 
     kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
-    withZkClient(kyuubiConf) { zkClient =>
-      val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
+    withDiscoveryClient(kyuubiConf) { discoveryClient =>
+      val fromNamespace = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
       val toNamespace = getZkNamespace(args)
 
-      val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace)
+      val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
       val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()
 
       if (currentServerNodes.nonEmpty) {
-        def doCreate(zc: CuratorFramework): Unit = {
+        def doCreate(zc: DiscoveryClient): Unit = {
           currentServerNodes.foreach { sn =>
             info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
               s" from $fromNamespace to $toNamespace")
-            val newNodePath = createAndGetServiceNode(
+            val newNodePath = zc.createAndGetServiceNode(
               kyuubiConf,
-              zc,
               args.cliArgs.namespace,
               sn.instance,
               sn.version,
@@ -110,10 +107,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
         }
 
         if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) {
-          doCreate(zkClient)
+          doCreate(discoveryClient)
         } else {
           kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
-          withZkClient(kyuubiConf)(doCreate)
+          withDiscoveryClient(kyuubiConf)(doCreate)
         }
       }
 
@@ -126,13 +123,13 @@ private[kyuubi] class ServiceControlCli extends Logging {
    * List Kyuubi server nodes info.
    */
   private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = {
-    withZkClient(args.conf) { zkClient =>
+    withDiscoveryClient(args.conf) { discoveryClient =>
       val znodeRoot = getZkNamespace(args)
       val hostPortOpt =
         if (filterHostPort) {
           Some((args.cliArgs.host, args.cliArgs.port.toInt))
         } else None
-      val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
+      val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
 
       val title = "Zookeeper service nodes"
       info(renderServiceNodesInfo(title, nodes, verbose))
@@ -140,10 +137,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
   }
 
   private def getServiceNodes(
-      zkClient: CuratorFramework,
+      discoveryClient: DiscoveryClient,
       znodeRoot: String,
       hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
-    val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot)
+    val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
     hostPortOpt match {
       case Some((host, port)) => serviceNodes.filter { sn =>
           sn.host == host && sn.port == port
@@ -156,17 +153,17 @@ private[kyuubi] class ServiceControlCli extends Logging {
    * Delete zookeeper service node with specified host port.
    */
   private def delete(args: ServiceControlCliArguments): Unit = {
-    withZkClient(args.conf) { zkClient =>
+    withDiscoveryClient(args.conf) { discoveryClient =>
       val znodeRoot = getZkNamespace(args)
       val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt))
-      val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
+      val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
 
       val deletedNodes = ListBuffer[ServiceNodeInfo]()
       nodesToDelete.foreach { node =>
         val nodePath = s"$znodeRoot/${node.nodeName}"
         info(s"Deleting zookeeper service node:$nodePath")
         try {
-          zkClient.delete().forPath(nodePath)
+          discoveryClient.delete(nodePath)
           deletedNodes += node
         } catch {
           case e: Exception =>
@@ -227,7 +224,7 @@ object ServiceControlCli extends CommandLineUtils with Logging {
   private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = {
     args.cliArgs.service match {
       case ServiceControlObject.SERVER =>
-        ZKPaths.makePath(null, args.cliArgs.namespace)
+        DiscoveryPaths.makePath(null, args.cliArgs.namespace)
       case ServiceControlObject.ENGINE =>
         val engineType = Some(args.cliArgs.engineType)
           .filter(_ != null).filter(_.nonEmpty)
@@ -237,10 +234,10 @@ object ServiceControlCli extends CommandLineUtils with Logging {
           .getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
         // The path of the engine defined in zookeeper comes from
         // org.apache.kyuubi.engine.EngineRef#engineSpace
-        ZKPaths.makePath(
+        DiscoveryPaths.makePath(
           s"${args.cliArgs.namespace}_${KYUUBI_VERSION}_${ShareLevel.USER}_${engineType}",
           args.cliArgs.user,
-          engineSubdomain)
+          Array(engineSubdomain))
     }
   }
 
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
index 5c177ddcc..0cae9d992 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
@@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl
 import java.io.{OutputStream, PrintStream}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
 trait TestPrematureExit {
@@ -86,9 +85,8 @@ trait TestPrematureExit {
 }
 
 class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
+  import DiscoveryClientProvider._
   import ServiceControlCli._
-  import ServiceDiscovery._
-  import ZooKeeperClientProvider._
 
   val zkServer = new EmbeddedZookeeper()
   val conf: KyuubiConf = KyuubiConf()
@@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)
 
-    withZkClient(conf) { framework =>
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+    withDiscoveryClient(conf) { framework =>
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
 
       val newNamespace = getUniqueNamespace()
       val args = Array(
@@ -245,7 +243,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
 
       testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
       val znodeRoot = s"/$newNamespace"
-      val children = framework.getChildren.forPath(znodeRoot).asScala.sorted
+      val children = framework.getChildren(znodeRoot).sorted
       assert(children.size == 2)
 
       assert(children.head.startsWith(
@@ -253,7 +251,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       assert(children.last.startsWith(
         s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence="))
       children.foreach { child =>
-        framework.delete().forPath(s"""$znodeRoot/$child""")
+        framework.delete(s"""$znodeRoot/$child""")
       }
     }
   }
@@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       .set(HA_ZK_NAMESPACE, uniqueNamespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    withZkClient(conf) { framework =>
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+    withDiscoveryClient(conf) { framework =>
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
 
       val args = Array(
         "list",
@@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       .set(HA_ZK_NAMESPACE, uniqueNamespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    withZkClient(conf) { framework =>
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+    withDiscoveryClient(conf) { framework =>
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
 
       val args = Array(
         "get",
@@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       .set(HA_ZK_NAMESPACE, uniqueNamespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    withZkClient(conf) { framework =>
-      withZkClient(conf) { zc =>
-        createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
-        createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
+    withDiscoveryClient(conf) { framework =>
+      withDiscoveryClient(conf) { zc =>
+        framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true)
+        framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true)
       }
 
       val args = Array(
@@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       .set(HA_ZK_NAMESPACE, uniqueNamespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    withZkClient(conf) { framework =>
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
-      createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+    withDiscoveryClient(conf) { framework =>
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+      framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
 
       val args = Array(
         "list",
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index 34a337c4d..c6bd587b3 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -22,7 +22,8 @@ import java.time.Duration
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
-import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes}
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.RetryPolicies
 
 object HighAvailabilityConf {
 
@@ -51,16 +52,16 @@ object HighAvailabilityConf {
   val HA_ZK_AUTH_TYPE: ConfigEntry[String] =
     buildConf("kyuubi.ha.zookeeper.auth.type")
       .doc("The type of zookeeper authentication, all candidates are " +
-        s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
+        s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
       .version("1.3.2")
       .stringConf
-      .checkValues(ZooKeeperAuthTypes.values.map(_.toString))
-      .createWithDefault(ZooKeeperAuthTypes.NONE.toString)
+      .checkValues(AuthTypes.values.map(_.toString))
+      .createWithDefault(AuthTypes.NONE.toString)
 
   val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] =
     buildConf("kyuubi.ha.zookeeper.engine.auth.type")
       .doc("The type of zookeeper authentication for engine, all candidates are " +
-        s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
+        s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
       .version("1.3.2")
       .fallbackConf(HA_ZK_AUTH_TYPE)
 
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
similarity index 91%
copy from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
copy to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
index da28db730..0c02764fb 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
@@ -17,9 +17,9 @@
 
 package org.apache.kyuubi.ha.client
 
-object ZooKeeperAuthTypes extends Enumeration {
+object AuthTypes extends Enumeration {
 
-  type ZooKeeperAuthType = Value
+  type AuthTypes = Value
 
   val NONE, KERBEROS, DIGEST = Value
 
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
new file mode 100644
index 000000000..1a390d418
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+/**
+ * A collection of apis that discovery client need implement.
+ */
+trait DiscoveryClient extends Logging {
+
+  /**
+   * Create a discovery client.
+   */
+  def createClient(): Unit
+
+  /**
+   * Close the discovery client.
+   */
+  def closeClient(): Unit
+
+  /**
+   * Create path on discovery service.
+   */
+  def create(path: String, mode: String, createParent: Boolean = true): String
+
+  /**
+   * Get the stored data under path.
+   */
+  def getData(path: String): Array[Byte]
+
+  /**
+   * Get the paths under given path.
+   * @return list of path
+   */
+  def getChildren(path: String): List[String]
+
+  /**
+   * Check if the path is exists.
+   */
+  def pathExists(path: String): Boolean
+
+  /**
+   * Check if the path non exists.
+   */
+  def pathNonExists(path: String): Boolean
+
+  /**
+   * Delete a path.
+   * @param path the path to be deleted
+   * @param deleteChildren if true, will also delete children if they exist.
+   */
+  def delete(path: String, deleteChildren: Boolean = false): Unit
+
+  /**
+   * Add a monitor for serviceDiscovery. It is used to stop service discovery gracefully
+   * when disconnect.
+   */
+  def monitorState(serviceDiscovery: ServiceDiscovery): Unit
+
+  /**
+   * The distributed lock path used to ensure only once engine being created for non-CONNECTION
+   * share level.
+   */
+  def tryWithLock[T](
+      lockPath: String,
+      timeout: Long,
+      unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T
+
+  /**
+   * Get the engine address and port from engine space.
+   * @return engine host and port
+   */
+  def getServerHost(namespace: String): Option[(String, Int)]
+
+  /**
+   * Get engine info by engine ref id from engine space.
+   * @param namespace the path to get engine ref
+   * @param engineRefId engine ref id
+   * @return engine host and port
+   */
+  def getEngineByRefId(
+      namespace: String,
+      engineRefId: String): Option[(String, Int)]
+
+  /**
+   * Get service node info from server space.
+   * @param namespace the path to get node info
+   * @param sizeOpt how many nodes to pick
+   * @param silent if true, error message will not be logged
+   * @return Service node info
+   */
+  def getServiceNodesInfo(
+      namespace: String,
+      sizeOpt: Option[Int] = None,
+      silent: Boolean = false): Seq[ServiceNodeInfo]
+
+  /**
+   * Register Kyuubi instance on discovery service.
+   * @param conf Kyuubi config
+   * @param namespace the path to register instance
+   * @param serviceDiscovery service discovery
+   * @param version kyuubi version
+   * @param external if true,
+   *                 the service info will not be automatically deleted upon client's disconnect
+   */
+  def registerService(
+      conf: KyuubiConf,
+      namespace: String,
+      serviceDiscovery: ServiceDiscovery,
+      version: Option[String] = None,
+      external: Boolean = false): Unit
+
+  /**
+   * Deregister Kyuubi instance on discovery service.
+   */
+  def deregisterService(): Unit
+
+  /**
+   * Request remove Kyuubi instance on discovery service.
+   */
+  def postDeregisterService(namespace: String): Boolean
+
+  /**
+   * Create server service node info on discovery and get the actual path.
+   * @param conf Kyuubi config
+   * @param namespace the path to register instance
+   * @param instance server info, host:port
+   * @param version kyuubi version
+   * @param external if true,
+   *                 the service info will not be automatically deleted upon client's disconnect
+   */
+  def createAndGetServiceNode(
+      conf: KyuubiConf,
+      namespace: String,
+      instance: String,
+      version: Option[String] = None,
+      external: Boolean = false): String
+
+  /**
+   * Create a node to store engine secret.
+   * @param createMode create node mode, automatically deleted or not
+   * @param basePath the base path for the node
+   * @param initData the init data to be stored
+   * @param useProtection if true, createBuilder with protection
+   */
+  def startSecretNode(
+      createMode: String,
+      basePath: String,
+      initData: String,
+      useProtection: Boolean = false): Unit
+}
+
+object DiscoveryClient {
+
+  /**
+   * Parse instance info string, get host and port.
+   */
+  private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
+    val maybeInfos = instance.split(";")
+      .map(_.split("=", 2))
+      .filter(_.size == 2)
+      .map(i => (i(0), i(1)))
+      .toMap
+    if (maybeInfos.size > 0) {
+      (
+        maybeInfos.get("hive.server2.thrift.bind.host").get,
+        maybeInfos.get("hive.server2.thrift.port").get.toInt)
+    } else {
+      val strings = instance.split(":")
+      (strings(0), strings(1).toInt)
+    }
+  }
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala
similarity index 54%
copy from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
copy to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala
index da28db730..be908e2fc 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala
@@ -17,10 +17,29 @@
 
 package org.apache.kyuubi.ha.client
 
-object ZooKeeperAuthTypes extends Enumeration {
+import java.io.IOException
 
-  type ZooKeeperAuthType = Value
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient
 
-  val NONE, KERBEROS, DIGEST = Value
+object DiscoveryClientProvider extends Logging {
+
+  /**
+   * Creates a zookeeper client before calling `f` and close it after calling `f`.
+   */
+  def withDiscoveryClient[T](conf: KyuubiConf)(f: DiscoveryClient => T): T = {
+    val discoveryClient = new ZookeeperDiscoveryClient(conf)
+    try {
+      discoveryClient.createClient()
+      f(discoveryClient)
+    } finally {
+      try {
+        discoveryClient.closeClient()
+      } catch {
+        case e: IOException => error("Failed to release the zkClient", e)
+      }
+    }
+  }
 
 }
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala
similarity index 72%
copy from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
copy to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala
index da28db730..4116fd6cf 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala
@@ -17,10 +17,14 @@
 
 package org.apache.kyuubi.ha.client
 
-object ZooKeeperAuthTypes extends Enumeration {
+import org.apache.curator.utils.ZKPaths
 
-  type ZooKeeperAuthType = Value
-
-  val NONE, KERBEROS, DIGEST = Value
+object DiscoveryPaths {
+  def makePath(parent: String, firstChild: String): String = {
+    ZKPaths.makePath(parent, firstChild)
+  }
 
+  def makePath(parent: String, firstChild: String, restChildren: Array[String]): String = {
+    ZKPaths.makePath(parent, firstChild, restChildren: _*)
+  }
 }
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index 98127edf3..88388f3ba 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -37,7 +37,7 @@ class EngineServiceDiscovery(
         // For connection level, we should clean up the namespace in zk in case the disk stress.
         case "CONNECTION" =>
           try {
-            if (discoveryClient.postDeregisterService) {
+            if (discoveryClient.postDeregisterService(namespace)) {
               info("Clean up discovery service due to this is connection share level.")
             }
           } catch {
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 1723561a4..c65a4aa91 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -17,20 +17,12 @@
 
 package org.apache.kyuubi.ha.client
 
-import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.JavaConverters._
-
-import com.google.common.annotations.VisibleForTesting
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.utils.ZKPaths
-
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient
 import org.apache.kyuubi.service.{AbstractService, FrontendService}
 
 /**
@@ -45,21 +37,28 @@ abstract class ServiceDiscovery(
 
   protected val isServerLost = new AtomicBoolean(false)
 
-  private var _discoveryClient: ServiceDiscoveryClient = _
+  /**
+   * a pre-defined namespace used to publish the instance of the associate service
+   */
+  private var _namespace: String = _
+  private var _discoveryClient: DiscoveryClient = _
 
-  def discoveryClient: ServiceDiscoveryClient = _discoveryClient
+  def namespace: String = _namespace
+  def discoveryClient: DiscoveryClient = _discoveryClient
 
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
 
-    _discoveryClient = new ServiceDiscoveryClient(this)
-    discoveryClient.createClient(conf)
+    _namespace = conf.get(HA_ZK_NAMESPACE)
+    _discoveryClient = new ZookeeperDiscoveryClient(conf)
+    discoveryClient.monitorState(this)
+    discoveryClient.createClient()
 
     super.initialize(conf)
   }
 
   override def start(): Unit = {
-    discoveryClient.registerService(conf)
+    discoveryClient.registerService(conf, namespace, this)
     super.start()
   }
 
@@ -80,74 +79,4 @@ object ServiceDiscovery extends Logging {
     val zkEnsemble = conf.get(HA_ZK_QUORUM)
     zkEnsemble != null && zkEnsemble.nonEmpty
   }
-
-  def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = {
-    // TODO: use last one because to avoid touching some maybe-crashed engines
-    // We need a big improvement here.
-    getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match {
-      case Seq(sn) => Some((sn.host, sn.port))
-      case _ => None
-    }
-  }
-
-  def getEngineByRefId(
-      zkClient: CuratorFramework,
-      namespace: String,
-      engineRefId: String): Option[(String, Int)] = {
-    getServiceNodesInfo(zkClient, namespace, silent = true)
-      .find(_.engineRefId.exists(_.equals(engineRefId)))
-      .map(data => (data.host, data.port))
-  }
-
-  def getServiceNodesInfo(
-      zkClient: CuratorFramework,
-      namespace: String,
-      sizeOpt: Option[Int] = None,
-      silent: Boolean = false): Seq[ServiceNodeInfo] = {
-    try {
-      val hosts = zkClient.getChildren.forPath(namespace)
-      val size = sizeOpt.getOrElse(hosts.size())
-      hosts.asScala.takeRight(size).map { p =>
-        val path = ZKPaths.makePath(namespace, p)
-        val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
-        val (host, port) = parseInstanceHostPort(instance)
-        val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
-        val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
-        info(s"Get service instance:$instance and version:$version under $namespace")
-        ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
-      }
-    } catch {
-      case _: Exception if silent => Nil
-      case e: Exception =>
-        error(s"Failed to get service node info", e)
-        Nil
-    }
-  }
-
-  @VisibleForTesting
-  private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
-    val maybeInfos = instance.split(";")
-      .map(_.split("=", 2))
-      .filter(_.size == 2)
-      .map(i => (i(0), i(1)))
-      .toMap
-    if (maybeInfos.size > 0) {
-      (
-        maybeInfos.get("hive.server2.thrift.bind.host").get,
-        maybeInfos.get("hive.server2.thrift.port").get.toInt)
-    } else {
-      val strings = instance.split(":")
-      (strings(0), strings(1).toInt)
-    }
-  }
-
-  def createAndGetServiceNode(
-      conf: KyuubiConf,
-      zkClient: CuratorFramework,
-      namespace: String,
-      instance: String,
-      version: Option[String] = None,
-      external: Boolean = false): String = {
-    createServiceNode(conf, zkClient, namespace, instance, version, external).getActualPath
-  }
 }
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
deleted file mode 100644
index 98efe2399..000000000
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.ha.client.zookeeper
-
-import java.io.IOException
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.ConnectionState
-import org.apache.curator.framework.state.ConnectionState.CONNECTED
-import org.apache.curator.framework.state.ConnectionState.LOST
-import org.apache.curator.framework.state.ConnectionState.RECONNECTED
-import org.apache.curator.framework.state.ConnectionStateListener
-import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.CreateMode
-import org.apache.zookeeper.CreateMode.PERSISTENT
-import org.apache.zookeeper.KeeperException
-import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.apache.zookeeper.WatchedEvent
-import org.apache.zookeeper.Watcher
-
-import org.apache.kyuubi.KYUUBI_VERSION
-import org.apache.kyuubi.KyuubiException
-import org.apache.kyuubi.Logging
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
-import org.apache.kyuubi.ha.client.ServiceDiscovery
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient, getGracefulStopThreadDelay}
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
-import org.apache.kyuubi.util.KyuubiHadoopUtils
-import org.apache.kyuubi.util.ThreadUtils
-
-class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging {
-
-  /**
-   * a pre-defined namespace used to publish the instance of the associate service
-   */
-  protected var _namespace: String = _
-
-  private lazy val instance: String = serviceDiscovery.fe.connectionUrl
-  private var zkClient: CuratorFramework = _
-  private var serviceNode: PersistentNode = _
-
-  def namespace: String = _namespace
-
-  def createClient(conf: KyuubiConf): Unit = {
-    _namespace = conf.get(HA_ZK_NAMESPACE)
-    zkClient = buildZookeeperClient(conf)
-    zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
-      private val isConnected = new AtomicBoolean(false)
-
-      override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
-        info(s"Zookeeper client connection state changed to: $newState")
-        newState match {
-          case CONNECTED | RECONNECTED => isConnected.set(true)
-          case LOST =>
-            isConnected.set(false)
-            val delay = getGracefulStopThreadDelay(conf)
-            connectionChecker.schedule(
-              new Runnable {
-                override def run(): Unit = if (!isConnected.get()) {
-                  error(s"Zookeeper client connection state changed to: $newState, but failed to" +
-                    s" reconnect in ${delay / 1000} seconds. Give up retry and stop gracefully . ")
-                  serviceDiscovery.stopGracefully(true)
-                }
-              },
-              delay,
-              TimeUnit.MILLISECONDS)
-          case _ =>
-        }
-      }
-    })
-    zkClient.start()
-  }
-
-  def registerService(conf: KyuubiConf): Unit = {
-    serviceNode = createServiceNode(conf, zkClient, namespace, instance)
-    // Set a watch on the serviceNode
-    val watcher = new DeRegisterWatcher
-    if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
-      // No node exists, throw exception
-      throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
-        s"instance[${instance}] on ZooKeeper.")
-    }
-  }
-
-  /**
-   * Close the serviceNode if not closed yet
-   * and the znode will be deleted upon the serviceNode closed.
-   */
-  def deregisterService(): Unit = {
-    // close the EPHEMERAL_SEQUENTIAL node in zk
-    if (serviceNode != null) {
-      try {
-        serviceNode.close()
-      } catch {
-        case e @ (_: IOException | _: KeeperException) =>
-          error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
-      } finally {
-        serviceNode = null
-      }
-    }
-  }
-
-  def postDeregisterService(): Boolean = {
-    if (namespace != null) {
-      try {
-        zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
-        true
-      } catch {
-        case e: KeeperException =>
-          warn(s"Failed to delete $namespace", e)
-          false
-      }
-    } else {
-      false
-    }
-  }
-
-  def closeClient(): Unit = {
-    if (zkClient != null) zkClient.close()
-  }
-
-  class DeRegisterWatcher extends Watcher {
-    override def process(event: WatchedEvent): Unit = {
-      if (event.getType == Watcher.Event.EventType.NodeDeleted) {
-        warn(s"This Kyuubi instance ${instance} is now de-registered from" +
-          s" ZooKeeper. The server will be shut down after the last client session completes.")
-        serviceDiscovery.stopGracefully()
-      }
-    }
-  }
-}
-
-object ServiceDiscoveryClient extends Logging {
-  final private lazy val connectionChecker =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
-
-  private[client] def createServiceNode(
-      conf: KyuubiConf,
-      zkClient: CuratorFramework,
-      namespace: String,
-      instance: String,
-      version: Option[String] = None,
-      external: Boolean = false): PersistentNode = {
-    val ns = ZKPaths.makePath(null, namespace)
-    try {
-      zkClient
-        .create()
-        .creatingParentsIfNeeded()
-        .withMode(PERSISTENT)
-        .forPath(ns)
-    } catch {
-      case _: NodeExistsException => // do nothing
-      case e: KeeperException =>
-        throw new KyuubiException(s"Failed to create namespace '$ns'", e)
-    }
-
-    val session = conf.get(HA_ZK_ENGINE_REF_ID)
-      .map(refId => s"refId=$refId;").getOrElse("")
-    val pathPrefix = ZKPaths.makePath(
-      namespace,
-      s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
-    var serviceNode: PersistentNode = null
-    val createMode =
-      if (external) CreateMode.PERSISTENT_SEQUENTIAL
-      else CreateMode.EPHEMERAL_SEQUENTIAL
-    val znodeData =
-      if (conf.get(HA_ZK_PUBLISH_CONFIGS) && session.isEmpty) {
-        addConfsToPublish(conf, instance)
-      } else {
-        instance
-      }
-    try {
-      serviceNode = new PersistentNode(
-        zkClient,
-        createMode,
-        false,
-        pathPrefix,
-        znodeData.getBytes(StandardCharsets.UTF_8))
-      serviceNode.start()
-      val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
-      if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
-        throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
-      }
-      info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
-    } catch {
-      case e: Exception =>
-        if (serviceNode != null) {
-          serviceNode.close()
-        }
-        throw new KyuubiException(
-          s"Unable to create a znode for this server instance: $instance",
-          e)
-    }
-    serviceNode
-  }
-
-  /**
-   * Refer to the implementation of HIVE-11581 to simplify user connection parameters.
-   * https://issues.apache.org/jira/browse/HIVE-11581
-   * HiveServer2 should store connection params in ZK
-   * when using dynamic service discovery for simpler client connection string.
-   */
-  private[client] def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
-    if (!instance.contains(":")) {
-      return instance
-    }
-    val hostPort = instance.split(":", 2)
-    val confsToPublish = collection.mutable.Map[String, String]()
-
-    // Hostname
-    confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
-    // Transport mode
-    confsToPublish += ("hive.server2.transport.mode" -> "binary")
-    // Transport specific confs
-    confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
-    confsToPublish += ("hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf.SASL_QOP))
-    // Auth specific confs
-    val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
-    confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
-    if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
-      confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
-        conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
-          .getOrElse(""))
-    }
-    confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
-  }
-}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
similarity index 86%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala
rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
index 654cb8f8d..40ead4633 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
 
 import org.apache.curator.framework.api.ACLProvider
 import org.apache.zookeeper.ZooDefs
@@ -23,8 +23,9 @@ import org.apache.zookeeper.data.ACL
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.AuthTypes
 
-class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
+class ZookeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
 
   /**
    * Return the ACL list to use by default.
@@ -53,15 +54,15 @@ class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
     nodeAcls
   }
 
-  private def enabledServerAcls(): Boolean = ZooKeeperAuthTypes
+  private def enabledServerAcls(): Boolean = AuthTypes
     .withName(conf.get(HighAvailabilityConf.HA_ZK_AUTH_TYPE)) match {
-    case ZooKeeperAuthTypes.NONE => false
+    case AuthTypes.NONE => false
     case _ => true
   }
 
-  private def enabledEngineAcls(): Boolean = ZooKeeperAuthTypes
+  private def enabledEngineAcls(): Boolean = AuthTypes
     .withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) match {
-    case ZooKeeperAuthTypes.NONE => false
+    case AuthTypes.NONE => false
     case _ => true
   }
 
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
similarity index 85%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
index fa7d366ed..31bf33837 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
 
 import java.io.{File, IOException}
 import javax.security.auth.login.Configuration
@@ -30,15 +30,16 @@ import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManage
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.ha.client.RetryPolicies._
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
-object ZooKeeperClientProvider extends Logging {
-
-  import RetryPolicies._
+object ZookeeperClientProvider extends Logging {
 
   /**
    * Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
-   * Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
+   * Use the [[ZookeeperACLProvider]] to create appropriate ACLs
    */
   def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = {
     setUpZooKeeperAuth(conf)
@@ -61,7 +62,7 @@ object ZooKeeperClientProvider extends Logging {
       .connectString(connectionStr)
       .sessionTimeoutMs(sessionTimeout)
       .connectionTimeoutMs(connectionTimeout)
-      .aclProvider(new ZooKeeperACLProvider(conf))
+      .aclProvider(new ZookeeperACLProvider(conf))
       .retryPolicy(retryPolicy)
 
     conf.get(HA_ZK_AUTH_DIGEST) match {
@@ -94,23 +95,6 @@ object ZooKeeperClientProvider extends Logging {
     }
   }
 
-  /**
-   * Creates a zookeeper client before calling `f` and close it after calling `f`.
-   */
-  def withZkClient[T](conf: KyuubiConf)(f: CuratorFramework => T): T = {
-    val zkClient = buildZookeeperClient(conf)
-    try {
-      zkClient.start()
-      f(zkClient)
-    } finally {
-      try {
-        zkClient.close()
-      } catch {
-        case e: IOException => error("Failed to release the zkClient", e)
-      }
-    }
-  }
-
   /**
    * For a kerberized cluster, we dynamically set up the client's JAAS conf.
    *
@@ -136,10 +120,10 @@ object ZooKeeperClientProvider extends Logging {
     }
 
     if (conf.get(HA_ZK_ENGINE_REF_ID).isEmpty
-      && ZooKeeperAuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) {
+      && AuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == AuthTypes.KERBEROS) {
       setupZkAuth()
-    } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && ZooKeeperAuthTypes
-        .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) {
+    } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && AuthTypes
+        .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
       setupZkAuth()
     }
 
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
new file mode 100644
index 000000000..cb1c52945
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.zookeeper
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
+import org.apache.curator.framework.recipes.nodes.PersistentNode
+import org.apache.curator.framework.state.ConnectionState
+import org.apache.curator.framework.state.ConnectionState.CONNECTED
+import org.apache.curator.framework.state.ConnectionState.LOST
+import org.apache.curator.framework.state.ConnectionState.RECONNECTED
+import org.apache.curator.framework.state.ConnectionStateListener
+import org.apache.curator.utils.ZKPaths
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.WatchedEvent
+import org.apache.zookeeper.Watcher
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider.buildZookeeperClient
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider.getGracefulStopThreadDelay
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient.connectionChecker
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.ThreadUtils
+
+class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
+
+  private val zkClient: CuratorFramework = buildZookeeperClient(conf)
+  private var serviceNode: PersistentNode = _
+
+  def createClient(): Unit = {
+    zkClient.start()
+  }
+
+  def closeClient(): Unit = {
+    if (zkClient != null) {
+      zkClient.close()
+    }
+  }
+
+  def create(path: String, mode: String, createParent: Boolean = true): String = {
+    val builder =
+      if (createParent) zkClient.create().creatingParentsIfNeeded() else zkClient.create()
+    builder
+      .withMode(CreateMode.valueOf(mode))
+      .forPath(path)
+  }
+
+  def getData(path: String): Array[Byte] = {
+    zkClient.getData.forPath(path)
+  }
+
+  def getChildren(path: String): List[String] = {
+    zkClient.getChildren.forPath(path).asScala.toList
+  }
+
+  def pathExists(path: String): Boolean = {
+    zkClient.checkExists().forPath(path) != null
+  }
+
+  def pathNonExists(path: String): Boolean = {
+    zkClient.checkExists().forPath(path) == null
+  }
+
+  def delete(path: String, deleteChildren: Boolean = false): Unit = {
+    if (deleteChildren) {
+      zkClient.delete().deletingChildrenIfNeeded().forPath(path)
+    } else {
+      zkClient.delete().forPath(path)
+    }
+  }
+
+  def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+    zkClient
+      .getConnectionStateListenable.addListener(new ConnectionStateListener {
+        private val isConnected = new AtomicBoolean(false)
+
+        override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
+          info(s"Zookeeper client connection state changed to: $newState")
+          newState match {
+            case CONNECTED | RECONNECTED => isConnected.set(true)
+            case LOST =>
+              isConnected.set(false)
+              val delay = getGracefulStopThreadDelay(conf)
+              connectionChecker.schedule(
+                new Runnable {
+                  override def run(): Unit = if (!isConnected.get()) {
+                    error(s"Zookeeper client connection state changed to: $newState," +
+                      s" but failed to reconnect in ${delay / 1000} seconds." +
+                      s" Give up retry and stop gracefully . ")
+                    serviceDiscovery.stopGracefully(true)
+                  }
+                },
+                delay,
+                TimeUnit.MILLISECONDS)
+            case _ =>
+          }
+        }
+      })
+  }
+
+  def tryWithLock[T](
+      lockPath: String,
+      timeout: Long,
+      unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = {
+    var lock: InterProcessSemaphoreMutex = null
+    try {
+      try {
+        lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
+        // Acquire a lease. If no leases are available, this method blocks until either the
+        // maximum number of leases is increased or another client/process closes a lease
+        lock.acquire(timeout, unit)
+      } catch {
+        case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
+      }
+      f
+    } finally {
+      try {
+        if (lock != null) {
+          lock.release()
+        }
+      } catch {
+        case _: Exception =>
+      }
+    }
+  }
+
+  def getServerHost(namespace: String): Option[(String, Int)] = {
+    // TODO: use last one because to avoid touching some maybe-crashed engines
+    // We need a big improvement here.
+    getServiceNodesInfo(namespace, Some(1), silent = true) match {
+      case Seq(sn) => Some((sn.host, sn.port))
+      case _ => None
+    }
+  }
+
+  def getEngineByRefId(
+      namespace: String,
+      engineRefId: String): Option[(String, Int)] = {
+    getServiceNodesInfo(namespace, silent = true)
+      .find(_.engineRefId.exists(_.equals(engineRefId)))
+      .map(data => (data.host, data.port))
+  }
+
+  def getServiceNodesInfo(
+      namespace: String,
+      sizeOpt: Option[Int] = None,
+      silent: Boolean = false): Seq[ServiceNodeInfo] = {
+    try {
+      val hosts = zkClient.getChildren.forPath(namespace)
+      val size = sizeOpt.getOrElse(hosts.size())
+      hosts.asScala.takeRight(size).map { p =>
+        val path = ZKPaths.makePath(namespace, p)
+        val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
+        val (host, port) = DiscoveryClient.parseInstanceHostPort(instance)
+        val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
+        val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
+        info(s"Get service instance:$instance and version:$version under $namespace")
+        ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
+      }
+    } catch {
+      case _: Exception if silent => Nil
+      case e: Exception =>
+        error(s"Failed to get service node info", e)
+        Nil
+    }
+  }
+
+  def registerService(
+      conf: KyuubiConf,
+      namespace: String,
+      serviceDiscovery: ServiceDiscovery,
+      version: Option[String] = None,
+      external: Boolean = false): Unit = {
+    val instance = serviceDiscovery.fe.connectionUrl
+    val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+    serviceNode = createPersistentNode(conf, namespace, instance, version, external)
+    // Set a watch on the serviceNode
+    if (zkClient.checkExists
+        .usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath) == null) {
+      // No node exists, throw exception
+      throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
+        s"instance[${instance}] on ZooKeeper.")
+    }
+  }
+
+  def deregisterService(): Unit = {
+    // close the EPHEMERAL_SEQUENTIAL node in zk
+    if (serviceNode != null) {
+      try {
+        serviceNode.close()
+      } catch {
+        case e @ (_: IOException | _: KeeperException) =>
+          error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
+      } finally {
+        serviceNode = null
+      }
+    }
+  }
+
+  def postDeregisterService(namespace: String): Boolean = {
+    if (namespace != null) {
+      try {
+        delete(namespace, true)
+        true
+      } catch {
+        case e: KeeperException =>
+          warn(s"Failed to delete $namespace", e)
+          false
+      }
+    } else {
+      false
+    }
+  }
+
+  def createAndGetServiceNode(
+      conf: KyuubiConf,
+      namespace: String,
+      instance: String,
+      version: Option[String] = None,
+      external: Boolean = false): String = {
+    createPersistentNode(conf, namespace, instance, version, external).getActualPath
+  }
+
+  @VisibleForTesting
+  def startSecretNode(
+      createMode: String,
+      basePath: String,
+      initData: String,
+      useProtection: Boolean = false): Unit = {
+    val secretNode = new PersistentNode(
+      zkClient,
+      CreateMode.valueOf(createMode),
+      useProtection,
+      basePath,
+      initData.getBytes(StandardCharsets.UTF_8))
+    secretNode.start()
+  }
+
+  /**
+   * Refer to the implementation of HIVE-11581 to simplify user connection parameters.
+   * https://issues.apache.org/jira/browse/HIVE-11581
+   * HiveServer2 should store connection params in ZK
+   * when using dynamic service discovery for simpler client connection string.
+   */
+  private[client] def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
+    if (!instance.contains(":")) {
+      return instance
+    }
+    val hostPort = instance.split(":", 2)
+    val confsToPublish = collection.mutable.Map[String, String]()
+
+    // Hostname
+    confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
+    // Transport mode
+    confsToPublish += ("hive.server2.transport.mode" -> "binary")
+    // Transport specific confs
+    confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
+    confsToPublish += ("hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf.SASL_QOP))
+    // Auth specific confs
+    val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
+    confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
+    if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
+      confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
+        conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
+          .getOrElse(""))
+    }
+    confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
+  }
+
+  private def createPersistentNode(
+      conf: KyuubiConf,
+      namespace: String,
+      instance: String,
+      version: Option[String] = None,
+      external: Boolean = false): PersistentNode = {
+    val ns = ZKPaths.makePath(null, namespace)
+    try {
+      zkClient
+        .create()
+        .creatingParentsIfNeeded()
+        .withMode(PERSISTENT)
+        .forPath(ns)
+    } catch {
+      case _: NodeExistsException => // do nothing
+      case e: KeeperException =>
+        throw new KyuubiException(s"Failed to create namespace '$ns'", e)
+    }
+
+    val session = conf.get(HA_ZK_ENGINE_REF_ID)
+      .map(refId => s"refId=$refId;").getOrElse("")
+    val pathPrefix = ZKPaths.makePath(
+      namespace,
+      s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
+    var localServiceNode: PersistentNode = null
+    val createMode =
+      if (external) CreateMode.PERSISTENT_SEQUENTIAL
+      else CreateMode.EPHEMERAL_SEQUENTIAL
+    val znodeData =
+      if (conf.get(HA_ZK_PUBLISH_CONFIGS) && session.isEmpty) {
+        addConfsToPublish(conf, instance)
+      } else {
+        instance
+      }
+    try {
+      localServiceNode = new PersistentNode(
+        zkClient,
+        createMode,
+        false,
+        pathPrefix,
+        znodeData.getBytes(StandardCharsets.UTF_8))
+      localServiceNode.start()
+      val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
+      if (!localServiceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
+        throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
+      }
+      info(s"Created a ${localServiceNode.getActualPath} on ZooKeeper for KyuubiServer uri:" +
+        s" $instance")
+    } catch {
+      case e: Exception =>
+        if (localServiceNode != null) {
+          localServiceNode.close()
+        }
+        throw new KyuubiException(
+          s"Unable to create a znode for this server instance: $instance",
+          e)
+    }
+    localServiceNode
+  }
+
+  class DeRegisterWatcher(instance: String, serviceDiscovery: ServiceDiscovery) extends Watcher {
+    override def process(event: WatchedEvent): Unit = {
+      if (event.getType == Watcher.Event.EventType.NodeDeleted) {
+        warn(s"This Kyuubi instance ${instance} is now de-registered from" +
+          s" ZooKeeper. The server will be shut down after the last client session completes.")
+        serviceDiscovery.stopGracefully()
+      }
+    }
+  }
+}
+
+object ZookeeperDiscoveryClient extends Logging {
+  final private lazy val connectionChecker =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
index 037307951..75af21e3a 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
@@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 
 class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvider {
-  import ZooKeeperClientProvider._
+  import DiscoveryClientProvider._
 
   private var conf: KyuubiConf = _
 
@@ -34,8 +34,8 @@ class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProv
 
   override def getSecret(): String = {
     conf.get(HA_ZK_ENGINE_SECURE_SECRET_NODE).map { zkNode =>
-      withZkClient[String](conf) { zkClient =>
-        new String(zkClient.getData.forPath(zkNode), StandardCharsets.UTF_8)
+      withDiscoveryClient[String](conf) { discoveryClient =>
+        new String(discoveryClient.getData(zkNode), StandardCharsets.UTF_8)
       }
     }.getOrElse(
       throw new IllegalArgumentException(s"${HA_ZK_ENGINE_SECURE_SECRET_NODE.key} is not defined"))
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
similarity index 65%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
index da28db730..ca2a4ba88 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
@@ -17,10 +17,17 @@
 
 package org.apache.kyuubi.ha.client
 
-object ZooKeeperAuthTypes extends Enumeration {
-
-  type ZooKeeperAuthType = Value
-
-  val NONE, KERBEROS, DIGEST = Value
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
 
+class DiscoveryClientProviderSuite extends KyuubiFunSuite {
+  test("discovery") {
+    val conf = KyuubiConf()
+    DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
+      discoveryClient.getServerHost("/kyuubi")
+    }
+    DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
+      discoveryClient.getServerHost("/kyuubi")
+    }
+  }
 }
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
similarity index 69%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
index 97cbdf8a5..48fccfe98 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
@@ -15,13 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_CONN_BASE_RETRY_WAIT, HA_ZK_CONN_MAX_RETRIES, HA_ZK_CONN_MAX_RETRY_WAIT, HA_ZK_CONN_RETRY_POLICY}
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_BASE_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
 
-class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
+class ZookeeperClientProviderSuite extends KyuubiFunSuite {
 
   test("get graceful stop thread start delay") {
     val conf = KyuubiConf()
@@ -29,23 +32,23 @@ class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
     val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
     val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
     val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
-    val delay1 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    val delay1 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
     assert(delay1 >= baseSleepTime * maxRetries)
 
     conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
-    val delay2 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    val delay2 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
     assert(delay2 === baseSleepTime)
 
     conf.set(HA_ZK_CONN_RETRY_POLICY, "N_TIME")
-    val delay3 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    val delay3 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
     assert(delay3 === baseSleepTime * maxRetries)
 
     conf.set(HA_ZK_CONN_RETRY_POLICY, "UNTIL_ELAPSED")
-    val delay4 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    val delay4 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
     assert(delay4 === maxSleepTime)
 
     conf.set(HA_ZK_CONN_RETRY_POLICY, "BOUNDED_EXPONENTIAL_BACKOFF")
-    val delay5 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    val delay5 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
     assert(delay5 >= baseSleepTime * maxRetries)
   }
 }
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
similarity index 83%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index 182bf56e4..30cf1aeab 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
 
 import java.io.{File, IOException}
 import java.net.InetAddress
@@ -33,11 +33,16 @@ import org.scalatest.time.SpanSugar._
 import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client.KyuubiServiceDiscovery
 import org.apache.kyuubi.service._
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
-class ServiceDiscoverySuite extends KerberizedTestHelper {
-  import ZooKeeperClientProvider._
+class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
+  import DiscoveryClientProvider._
 
   val zkServer = new EmbeddedZookeeper()
   val conf: KyuubiConf = KyuubiConf()
@@ -80,17 +85,17 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
     server.initialize(conf)
     server.start()
     val znodeRoot = s"/$namespace"
-    withZkClient(conf) { framework =>
+    withDiscoveryClient(conf) { framework =>
       try {
-        assert(framework.checkExists().forPath("/abc") === null)
-        assert(framework.checkExists().forPath(znodeRoot) !== null)
-        val children = framework.getChildren.forPath(znodeRoot).asScala
+        assert(framework.pathNonExists("/abc"))
+        assert(framework.pathExists(znodeRoot))
+        val children = framework.getChildren(znodeRoot)
         assert(children.head ===
           s"serviceUri=${server.frontendServices.head.connectionUrl};" +
           s"version=$KYUUBI_VERSION;sequence=0000000000")
 
         children.foreach { child =>
-          framework.delete().forPath(s"""$znodeRoot/$child""")
+          framework.delete(s"""$znodeRoot/$child""")
         }
         eventually(timeout(5.seconds), interval(100.millis)) {
           assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
@@ -112,21 +117,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
       assert(actual === expected)
     }
 
-    val acl = new ZooKeeperACLProvider(conf).getDefaultAcl
+    val acl = new ZookeeperACLProvider(conf).getDefaultAcl
     assertACL(expectedNoACL, acl)
 
-    val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString)
-    val serverACL = new ZooKeeperACLProvider(serverConf).getDefaultAcl
+    val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, AuthTypes.KERBEROS.toString)
+    val serverACL = new ZookeeperACLProvider(serverConf).getDefaultAcl
     assertACL(expectedEnableACL, serverACL)
 
     val engineConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
-    engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
-    val engineACL = new ZooKeeperACLProvider(engineConf).getDefaultAcl
+    engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.NONE.toString)
+    val engineACL = new ZookeeperACLProvider(engineConf).getDefaultAcl
     assertACL(expectedNoACL, engineACL)
 
     val enableEngineACLConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
-    enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString)
-    val enableEngineACL = new ZooKeeperACLProvider(enableEngineACLConf).getDefaultAcl
+    enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.KERBEROS.toString)
+    val enableEngineACL = new ZookeeperACLProvider(enableEngineACLConf).getDefaultAcl
     assertACL(expectedEnableACL, enableEngineACL)
   }
 
@@ -137,9 +142,9 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
 
       conf.set(HA_ZK_AUTH_KEYTAB.key, keytab.getCanonicalPath)
       conf.set(HA_ZK_AUTH_PRINCIPAL.key, principal)
-      conf.set(HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString)
+      conf.set(HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString)
 
-      ZooKeeperClientProvider.setUpZooKeeperAuth(conf)
+      ZookeeperClientProvider.setUpZooKeeperAuth(conf)
       val configuration = Configuration.getConfiguration
       val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient")
 
@@ -151,7 +156,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
       assert(options("useKeyTab").toString.toBoolean)
 
       conf.set(HA_ZK_AUTH_KEYTAB.key, s"${keytab.getName}")
-      val e = intercept[IOException](ZooKeeperClientProvider.setUpZooKeeperAuth(conf))
+      val e = intercept[IOException](ZookeeperClientProvider.setUpZooKeeperAuth(conf))
       assert(e.getMessage === s"${HA_ZK_AUTH_KEYTAB.key} does not exists")
     }
   }
@@ -167,7 +172,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
         .set(HA_ZK_QUORUM, zkServer.getConnectString)
         .set(HA_ZK_NAMESPACE, namespace)
         .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-        .set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+        .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
 
       var serviceDiscovery: KyuubiServiceDiscovery = null
       val server: Serverable = new NoopTBinaryFrontendServer() {
@@ -183,18 +188,18 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
       server.start()
 
       val znodeRoot = s"/$namespace"
-      withZkClient(conf) { framework =>
+      withDiscoveryClient(conf) { framework =>
         try {
 
-          assert(framework.checkExists().forPath("/abc") === null)
-          assert(framework.checkExists().forPath(znodeRoot) !== null)
-          val children = framework.getChildren.forPath(znodeRoot).asScala
+          assert(framework.pathNonExists("/abc"))
+          assert(framework.pathExists(znodeRoot))
+          val children = framework.getChildren(znodeRoot)
           assert(children.head ===
             s"serviceUri=${server.frontendServices.head.connectionUrl};" +
             s"version=$KYUUBI_VERSION;sequence=0000000000")
 
           children.foreach { child =>
-            framework.delete().forPath(s"""$znodeRoot/$child""")
+            framework.delete(s"""$znodeRoot/$child""")
           }
           eventually(timeout(5.seconds), interval(100.millis)) {
             assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
@@ -216,7 +221,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
     val host = "127.0.0.1"
     val port = 10009
     val instance1 = s"$host:$port"
-    val (host1, port1) = ServiceDiscovery.parseInstanceHostPort(instance1)
+    val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
     assert(host === host1)
     assert(port === port1)
 
@@ -224,7 +229,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
       s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
       s"hive.server2.thrift.port=$port;" +
       s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
-    val (host2, port2) = ServiceDiscovery.parseInstanceHostPort(instance2)
+    val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
     assert(host === host2)
     assert(port === port2)
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e0b0a1c6c..894acfdbb 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -24,9 +24,6 @@ import scala.util.Random
 
 import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
-import org.apache.curator.utils.ZKPaths
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
@@ -41,8 +38,8 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.log.OperationLog
@@ -135,8 +132,8 @@ private[kyuubi] class EngineRef(
   private[kyuubi] lazy val engineSpace: String = {
     val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType"
     shareLevel match {
-      case CONNECTION => ZKPaths.makePath(commonParent, appUser, engineRefId)
-      case _ => ZKPaths.makePath(commonParent, appUser, subdomain)
+      case CONNECTION => DiscoveryPaths.makePath(commonParent, appUser, Array(engineRefId))
+      case _ => DiscoveryPaths.makePath(commonParent, appUser, Array(subdomain))
     }
   }
 
@@ -144,38 +141,23 @@ private[kyuubi] class EngineRef(
    * The distributed lock path used to ensure only once engine being created for non-CONNECTION
    * share level.
    */
-  private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match {
-    case CONNECTION => f
-    case _ =>
-      val lockPath =
-        ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subdomain)
-      var lock: InterProcessSemaphoreMutex = null
-      try {
-        try {
-          lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
-          // Acquire a lease. If no leases are available, this method blocks until either the
-          // maximum number of leases is increased or another client/process closes a lease
-          lock.acquire(timeout, TimeUnit.MILLISECONDS)
-        } catch {
-          case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
-        }
-        f
-      } finally {
-        try {
-          if (lock != null) {
-            lock.release()
-          }
-        } catch {
-          case _: Exception =>
-        }
-      }
-  }
+  private def tryWithLock[T](discoveryClient: DiscoveryClient)(f: => T): T =
+    shareLevel match {
+      case CONNECTION => f
+      case _ =>
+        val lockPath =
+          DiscoveryPaths.makePath(
+            s"${serverSpace}_$shareLevel",
+            "lock",
+            Array(appUser, subdomain))
+        discoveryClient.tryWithLock(lockPath, timeout, TimeUnit.MILLISECONDS)(f)
+    }
 
   private def create(
-      zkClient: CuratorFramework,
-      extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(zkClient) {
+      discoveryClient: DiscoveryClient,
+      extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(discoveryClient) {
     // Get the engine address ahead if another process has succeeded
-    var engineRef = getServerHost(zkClient, engineSpace)
+    var engineRef = discoveryClient.getServerHost(engineSpace)
     if (engineRef.nonEmpty) return engineRef.get
 
     conf.set(HA_ZK_NAMESPACE, engineSpace)
@@ -228,7 +210,7 @@ private[kyuubi] class EngineRef(
             s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage",
             builder.getError)
         }
-        engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
+        engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId)
       }
       engineRef.get
     } finally {
@@ -241,15 +223,15 @@ private[kyuubi] class EngineRef(
   /**
    * Get the engine ref from engine space first or create a new one
    *
-   * @param zkClient the zookeeper client to get or create engine instance
+   * @param discoveryClient the zookeeper client to get or create engine instance
    * @param extraEngineLog the launch engine operation log, used to inject engine log into it
    */
   def getOrCreate(
-      zkClient: CuratorFramework,
+      discoveryClient: DiscoveryClient,
       extraEngineLog: Option[OperationLog] = None): (String, Int) = {
-    getServerHost(zkClient, engineSpace)
+    discoveryClient.getServerHost(engineSpace)
       .getOrElse {
-        create(zkClient, extraEngineLog)
+        create(discoveryClient, extraEngineLog)
       }
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 7b5b70d72..b4ed17155 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
 import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
@@ -93,8 +93,8 @@ class SparkProcessBuilder(
     var allConf = conf.getAll
 
     // if enable sasl kerberos authentication for zookeeper, need to upload the server ketab file
-    if (ZooKeeperAuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
-        == ZooKeeperAuthTypes.KERBEROS) {
+    if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
+        == AuthTypes.KERBEROS) {
       allConf = allConf ++ zkAuthKeytabFileConf(allConf)
     }
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 9c461cdf7..341e9d0fd 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -22,9 +22,7 @@ import java.util
 
 import scala.util.Properties
 
-import org.apache.curator.utils.ZKPaths
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.zookeeper.CreateMode.PERSISTENT
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
@@ -35,8 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
 import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent, KyuubiServerInfoEvent}
 import org.apache.kyuubi.events.handler.ServerJsonLoggingEventHandler
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
+import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.client.ServiceDiscovery
 import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
 import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
 import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@@ -51,7 +51,7 @@ object KyuubiServer extends Logging {
       zkServer.initialize(conf)
       zkServer.start()
       conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
-      conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+      conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
     } else {
       // create chroot path if necessary
       val connectionStr = conf.get(HA_ZK_QUORUM)
@@ -71,15 +71,11 @@ object KyuubiServer extends Logging {
       chrootOption.foreach { chroot =>
         val zkConnectionForChrootCreation = connectionStr.substring(0, chrootIndex)
         val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM, zkConnectionForChrootCreation)
-        withZkClient(overrideQuorumConf) { zkClient =>
-          if (zkClient.checkExists().forPath(chroot) == null) {
-            val chrootPath = ZKPaths.makePath(null, chroot)
+        withDiscoveryClient(overrideQuorumConf) { discoveryClient =>
+          if (discoveryClient.pathNonExists(chroot)) {
+            val chrootPath = DiscoveryPaths.makePath(null, chroot)
             try {
-              zkClient
-                .create()
-                .creatingParentsIfNeeded()
-                .withMode(PERSISTENT)
-                .forPath(chrootPath)
+              discoveryClient.create(chrootPath, "PERSISTENT")
             } catch {
               case _: NodeExistsException => // do nothing
               case e: KeeperException =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 1c4c09eb4..cec83ac1b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.EngineRef
 import org.apache.kyuubi.events.{EventBus, KyuubiEvent, KyuubiSessionEvent}
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationState}
@@ -93,8 +93,8 @@ class KyuubiSessionImpl(
   }
 
   private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
-    withZkClient(sessionConf) { zkClient =>
-      val (host, port) = engine.getOrCreate(zkClient, extraEngineLog)
+    withDiscoveryClient(sessionConf) { discoveryClient =>
+      val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
       val passwd =
         if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
           EngineSecurityAccessor.get().issueToken()
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index d5d9dbae1..e83551bab 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -21,7 +21,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.server.KyuubiServer
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -48,7 +48,7 @@ trait WithKyuubiServer extends KyuubiFunSuite {
     zkServer.initialize(conf)
     zkServer.start()
     conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
-    conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+    conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
 
     conf.set("spark.ui.enabled", "false")
     conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 861dd0f02..8b82db990 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine
 
 import java.util.UUID
 
-import org.apache.curator.utils.ZKPaths
 import org.apache.hadoop.security.UserGroupInformation
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
@@ -27,7 +26,8 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryPaths
 import org.apache.kyuubi.util.NamedThreadFactory
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -67,7 +67,10 @@ class EngineRefSuite extends KyuubiFunSuite {
       domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
       val engine = new EngineRef(conf, user, id)
       assert(engine.engineSpace ===
-        ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}", user, id))
+        DiscoveryPaths.makePath(
+          s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}",
+          user,
+          Array(id)))
       assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${engineType}_${user}_$id")
     }
   }
@@ -78,7 +81,10 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
     val appName = new EngineRef(conf, user, id)
     assert(appName.engineSpace ===
-      ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL", user, "default"))
+      DiscoveryPaths.makePath(
+        s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL",
+        user,
+        Array("default")))
     assert(appName.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_default_$id")
 
     Seq(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN, KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN).foreach {
@@ -87,7 +93,10 @@ class EngineRefSuite extends KyuubiFunSuite {
         conf.set(k.key, "abc")
         val appName2 = new EngineRef(conf, user, id)
         assert(appName2.engineSpace ===
-          ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}", user, "abc"))
+          DiscoveryPaths.makePath(
+            s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}",
+            user,
+            Array("abc")))
         assert(appName2.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_abc_$id")
     }
   }
@@ -99,7 +108,10 @@ class EngineRefSuite extends KyuubiFunSuite {
     val engineRef = new EngineRef(conf, user, id)
     val primaryGroupName = UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
     assert(engineRef.engineSpace ===
-      ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", primaryGroupName, "default"))
+      DiscoveryPaths.makePath(
+        s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
+        primaryGroupName,
+        Array("default")))
     assert(engineRef.defaultEngineName ===
       s"kyuubi_GROUP_SPARK_SQL_${primaryGroupName}_default_$id")
 
@@ -109,10 +121,10 @@ class EngineRefSuite extends KyuubiFunSuite {
         conf.set(k.key, "abc")
         val engineRef2 = new EngineRef(conf, user, id)
         assert(engineRef2.engineSpace ===
-          ZKPaths.makePath(
+          DiscoveryPaths.makePath(
             s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}",
             primaryGroupName,
-            "abc"))
+            Array("abc")))
         assert(engineRef2.defaultEngineName ===
           s"kyuubi_${GROUP}_${SPARK_SQL}_${primaryGroupName}_abc_$id")
     }
@@ -122,7 +134,10 @@ class EngineRefSuite extends KyuubiFunSuite {
     assert(newUGI.getGroupNames.isEmpty)
     val engineRef3 = new EngineRef(conf, userName, id)
     assert(engineRef3.engineSpace ===
-      ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", userName, "abc"))
+      DiscoveryPaths.makePath(
+        s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
+        userName,
+        Array("abc")))
     assert(engineRef3.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${userName}_abc_$id")
   }
 
@@ -132,13 +147,19 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
     val appName = new EngineRef(conf, user, id)
     assert(appName.engineSpace ===
-      ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "default"))
+      DiscoveryPaths.makePath(
+        s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
+        user,
+        Array("default")))
     assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
 
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
     val appName2 = new EngineRef(conf, user, id)
     assert(appName2.engineSpace ===
-      ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "abc"))
+      DiscoveryPaths.makePath(
+        s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
+        user,
+        Array("abc")))
     assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_abc_$id")
   }
 
@@ -200,7 +221,7 @@ class EngineRefSuite extends KyuubiFunSuite {
 
     val r1 = new Runnable {
       override def run(): Unit = {
-        ZooKeeperClientProvider.withZkClient(conf) { client =>
+        DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
           val hp = engine.getOrCreate(client)
           port1 = hp._2
         }
@@ -209,7 +230,7 @@ class EngineRefSuite extends KyuubiFunSuite {
 
     val r2 = new Runnable {
       override def run(): Unit = {
-        ZooKeeperClientProvider.withZkClient(conf) { client =>
+        DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
           val hp = engine.getOrCreate(client)
           port2 = hp._2
         }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 45ff4b3bd..46525436c 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE}
 import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.service.ServiceUtils
 
 class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
@@ -281,7 +281,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
 
   test("zookeeper kerberos authentication") {
     val conf = KyuubiConf()
-    conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString)
+    conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString)
     conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab)
     conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal)
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
index ae5d611e0..5face684f 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
@@ -17,19 +17,14 @@
 
 package org.apache.kyuubi.operation
 
-import java.nio.charset.StandardCharsets
-
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.zookeeper.CreateMode
-
 import org.apache.kyuubi.WithKyuubiServer
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.service.authentication.{EngineSecurityAccessor, ZooKeeperEngineSecuritySecretProviderImpl}
 
 class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTestHelper {
-  import ZooKeeperClientProvider._
+  import DiscoveryClientProvider._
 
   override protected def jdbcUrl: String = getJdbcUrl
 
@@ -46,15 +41,9 @@ class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTe
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    withZkClient(conf) { zkClient =>
-      zkClient.create().withMode(CreateMode.PERSISTENT).forPath(engineSecretNode)
-      val secretNode = new PersistentNode(
-        zkClient,
-        CreateMode.PERSISTENT,
-        false,
-        engineSecretNode,
-        "_ENGINE_SECRET_".getBytes(StandardCharsets.UTF_8))
-      secretNode.start()
+    withDiscoveryClient(conf) { discoveryClient =>
+      discoveryClient.create(engineSecretNode, "PERSISTENT", false)
+      discoveryClient.startSecretNode("PERSISTENT", engineSecretNode, "_ENGINE_SECRET_")
     }
 
     conf.set(KyuubiConf.ENGINE_SECURITY_ENABLED, true)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
index 62284fd9d..fba26709d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.server
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.service.ServiceState._
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -116,16 +116,16 @@ class KyuubiServerSuite extends KyuubiFunSuite {
     val chrootPath = "/lake"
     conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection)
     // chroot path does not exist before server start
-    ZooKeeperClientProvider.withZkClient(conf) { client =>
-      assert(client.checkExists().forPath(chrootPath) == null)
+    DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+      assert(client.pathNonExists(chrootPath))
     }
 
     val zkWithChroot = zkConnection + chrootPath
     val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM, zkWithChroot)
     server = KyuubiServer.startServer(chrootConf)
     // chroot path exists after server started
-    ZooKeeperClientProvider.withZkClient(conf) { client =>
-      assert(client.checkExists().forPath(chrootPath) != null)
+    DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+      assert(client.pathExists(chrootPath))
     }
   }
 }