You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/07/12 03:04:43 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2644] Add etcd discovery client for HA

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 32970ce68 [KYUUBI #2644] Add etcd discovery client for HA
32970ce68 is described below

commit 32970ce685b10889f1951ce360f89e1b138d1148
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Tue Jul 12 11:04:35 2022 +0800

    [KYUUBI #2644] Add etcd discovery client for HA
    
    ### _Why are the changes needed?_
    
    Add etcd discovery client for HA
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2767 from hddong/etcd-support.
    
    Closes #2644
    
    6fa37578 [hongdongdong] Rebase and remove from rat-excludes
    fc23a2b4 [hongdongdong] upgrade grpc-1.47.0
    61c77d27 [hongdongdong] Fix tests
    28abd13e [hongdongdong] fix comments
    9e81a6a3 [hongdongdong] Rename HA_ZK_ENGINE_REF_ID to HA_ENGINE_REF_ID
    aa2b4260 [hongdongdong] [KYUUBI #2644][WIP] Add etcd discovery client for HA
    
    Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 LICENSE-binary                                     |   3 +
 dev/dependencyList                                 |  32 +-
 docs/deployment/settings.md                        |   1 +
 externals/kyuubi-spark-sql-engine/pom.xml          |  59 ++++
 .../META-INF/services/io.grpc.LoadBalancerProvider |  20 ++
 .../META-INF/services/io.grpc.NameResolverProvider |  21 ++
 .../spark/EtcdShareLevelSparkEngineSuite.scala     |  43 +++
 ...uite.scala => ShareLevelSparkEngineTests.scala} |  20 +-
 .../engine/spark/WithDiscoverySparkSQLEngine.scala |  39 +--
 ...SQLEngine.scala => WithEmbeddedZookeeper.scala} |  36 +--
 .../kyuubi/engine/spark/WithEtcdCluster.scala      |  52 ++++
 .../ZookeeperShareLevelSparkEngineSuite.scala      |  43 +++
 .../kyuubi/SparkSQLEngineDeregisterSuite.scala     |  37 ++-
 kyuubi-ha/pom.xml                                  |  36 +++
 .../apache/kyuubi/ha/HighAvailabilityConf.scala    |  11 +-
 .../ha/client/etcd/EtcdDiscoveryClient.scala       | 340 +++++++++++++++++++++
 .../ha/client/zookeeper/ZookeeperACLProvider.scala |   4 +-
 .../client/zookeeper/ZookeeperClientProvider.scala |   4 +-
 .../zookeeper/ZookeeperDiscoveryClient.scala       |   4 +-
 .../kyuubi/ha/client/DiscoveryClientTests.scala    | 157 ++++++++++
 .../ha/client/etcd/EtcdDiscoveryClientSuite.scala  |  74 +++++
 .../zookeeper/ZookeeperDiscoveryClientSuite.scala  | 135 +-------
 kyuubi-server/pom.xml                              |  16 +
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |   4 +-
 .../{EngineRefSuite.scala => EngineRefTests.scala} |  78 +----
 .../kyuubi/engine/EngineRefWithEtcdSuite.scala     |  54 ++++
 .../engine/EngineRefWithZookeeperSuite.scala       | 104 +++++++
 pom.xml                                            |  50 +++
 28 files changed, 1176 insertions(+), 301 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 2860e0640..74f18b45a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -263,6 +263,9 @@ io.dropwizard.metrics:metrics-jmx
 io.dropwizard.metrics:metrics-json
 io.dropwizard.metrics:metrics-jvm
 io.fabric8:kubernetes-client
+io.etcd:jetcd-core
+io.grpc:grpc-core
+io.grpc:grpc-protobuf
 io.netty:netty-all
 io.netty:netty-buffer
 io.netty:netty-codec
diff --git a/dev/dependencyList b/dev/dependencyList
index 7f3f6a71a..5abe9992d 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -16,6 +16,8 @@
 #
 
 HikariCP/4.0.3//HikariCP-4.0.3.jar
+animal-sniffer-annotations/1.19//animal-sniffer-annotations-1.19.jar
+annotations/4.1.1.4//annotations-4.1.1.4.jar
 aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
 automaton/1.11-8//automaton-1.11-8.jar
 classgraph/4.8.138//classgraph-4.8.138.jar
@@ -28,8 +30,19 @@ curator-client/2.12.0//curator-client-2.12.0.jar
 curator-framework/2.12.0//curator-framework-2.12.0.jar
 curator-recipes/2.12.0//curator-recipes-2.12.0.jar
 derby/10.14.2.0//derby-10.14.2.0.jar
+error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
+failsafe/2.4.4//failsafe-2.4.4.jar
 fliptables/1.0.2//fliptables-1.0.2.jar
 generex/1.0.2//generex-1.0.2.jar
+grpc-api/1.47.0//grpc-api-1.47.0.jar
+grpc-context/1.47.0//grpc-context-1.47.0.jar
+grpc-core/1.47.0//grpc-core-1.47.0.jar
+grpc-grpclb/1.47.0//grpc-grpclb-1.47.0.jar
+grpc-netty/1.47.0//grpc-netty-1.47.0.jar
+grpc-protobuf-lite/1.47.0//grpc-protobuf-lite-1.47.0.jar
+grpc-protobuf/1.47.0//grpc-protobuf-1.47.0.jar
+grpc-stub/1.47.0//grpc-stub-1.47.0.jar
+gson/2.8.9//gson-2.8.9.jar
 guava/30.1-jre//guava-30.1-jre.jar
 hadoop-client-api/3.3.1//hadoop-client-api-3.3.1.jar
 hadoop-client-runtime/3.3.1//hadoop-client-runtime-3.3.1.jar
@@ -47,6 +60,7 @@ hk2-utils/2.6.1//hk2-utils-2.6.1.jar
 htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
 httpclient/4.5.13//httpclient-4.5.13.jar
 httpcore/4.4.15//httpcore-4.4.15.jar
+j2objc-annotations/1.3//j2objc-annotations-1.3.jar
 jackson-annotations/2.13.3//jackson-annotations-2.13.3.jar
 jackson-core/2.13.3//jackson-core-2.13.3.jar
 jackson-databind/2.13.3//jackson-databind-2.13.3.jar
@@ -72,6 +86,10 @@ jersey-entity-filtering/2.36//jersey-entity-filtering-2.36.jar
 jersey-hk2/2.36//jersey-hk2-2.36.jar
 jersey-media-json-jackson/2.36//jersey-media-json-jackson-2.36.jar
 jersey-server/2.36//jersey-server-2.36.jar
+jetcd-api/0.7.1//jetcd-api-0.7.1.jar
+jetcd-common/0.7.1//jetcd-common-0.7.1.jar
+jetcd-core/0.7.1//jetcd-core-0.7.1.jar
+jetcd-grpc/0.7.1//jetcd-grpc-0.7.1.jar
 jetty-http/9.4.48.v20220622//jetty-http-9.4.48.v20220622.jar
 jetty-io/9.4.48.v20220622//jetty-io-9.4.48.v20220622.jar
 jetty-security/9.4.48.v20220622//jetty-security-9.4.48.v20220622.jar
@@ -115,9 +133,15 @@ metrics-json/4.2.8//metrics-json-4.2.8.jar
 metrics-jvm/4.2.8//metrics-jvm-4.2.8.jar
 netty-all/4.1.73.Final//netty-all-4.1.73.Final.jar
 netty-buffer/4.1.73.Final//netty-buffer-4.1.73.Final.jar
+netty-codec-dns/4.1.74.Final//netty-codec-dns-4.1.74.Final.jar
+netty-codec-http/4.1.72.Final//netty-codec-http-4.1.72.Final.jar
+netty-codec-http2/4.1.72.Final//netty-codec-http2-4.1.72.Final.jar
+netty-codec-socks/4.1.72.Final//netty-codec-socks-4.1.72.Final.jar
 netty-codec/4.1.73.Final//netty-codec-4.1.73.Final.jar
 netty-common/4.1.73.Final//netty-common-4.1.73.Final.jar
+netty-handler-proxy/4.1.72.Final//netty-handler-proxy-4.1.72.Final.jar
 netty-handler/4.1.73.Final//netty-handler-4.1.73.Final.jar
+netty-resolver-dns/4.1.74.Final//netty-resolver-dns-4.1.74.Final.jar
 netty-resolver/4.1.73.Final//netty-resolver-4.1.73.Final.jar
 netty-tcnative-classes/2.0.46.Final//netty-tcnative-classes-2.0.46.Final.jar
 netty-transport-classes-epoll/4.1.73.Final//netty-transport-classes-epoll-4.1.73.Final.jar
@@ -126,12 +150,16 @@ netty-transport-native-epoll/4.1.73.Final/linux-aarch_64/netty-transport-native-
 netty-transport-native-epoll/4.1.73.Final/linux-x86_64/netty-transport-native-epoll-4.1.73.Final-linux-x86_64.jar
 netty-transport-native-kqueue/4.1.73.Final/osx-aarch_64/netty-transport-native-kqueue-4.1.73.Final-osx-aarch_64.jar
 netty-transport-native-kqueue/4.1.73.Final/osx-x86_64/netty-transport-native-kqueue-4.1.73.Final-osx-x86_64.jar
-netty-transport-native-unix-common/4.1.73.Final//netty-transport-native-unix-common-4.1.73.Final.jar
+netty-transport-native-unix-common/4.1.72.Final//netty-transport-native-unix-common-4.1.72.Final.jar
 netty-transport/4.1.73.Final//netty-transport-4.1.73.Final.jar
 okhttp/3.12.12//okhttp-3.12.12.jar
 okio/1.15.0//okio-1.15.0.jar
 osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
 paranamer/2.8//paranamer-2.8.jar
+perfmark-api/0.25.0//perfmark-api-0.25.0.jar
+proto-google-common-protos/2.0.1//proto-google-common-protos-2.0.1.jar
+protobuf-java-util/3.19.2//protobuf-java-util-3.19.2.jar
+protobuf-java/3.19.2//protobuf-java-3.19.2.jar
 scala-library/2.12.15//scala-library-2.12.15.jar
 scopt_2.12/4.1.0//scopt_2.12-4.1.0.jar
 simpleclient/0.14.1//simpleclient-0.14.1.jar
@@ -149,5 +177,7 @@ swagger-core/2.2.1//swagger-core-2.2.1.jar
 swagger-integration/2.2.1//swagger-integration-2.2.1.jar
 swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
 swagger-models/2.2.1//swagger-models-2.2.1.jar
+vertx-core/4.2.7//vertx-core-4.2.7.jar
+vertx-grpc/4.2.7//vertx-grpc-4.2.7.jar
 zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
 zookeeper/3.4.14//zookeeper-3.4.14.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 78e4599b3..4d8d9fa6c 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -310,6 +310,7 @@ Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
 kyuubi.ha.addresses||The connection string for the discovery ensemble|string|1.6.0
 kyuubi.ha.client.class|org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient|Class name for service discovery client.|string|1.6.0
+kyuubi.ha.etcd.lease.timeout|PT10S|Timeout for etcd keep alive lease. The kyuubi server will known unexpected loss of engine after up to this seconds.|duration|1.6.0
 kyuubi.ha.namespace|kyuubi|The root directory for the service to deploy its instance uri|string|1.6.0
 kyuubi.ha.zookeeper.acl.enabled|false|Set to true if the zookeeper ensemble is kerberized|boolean|1.0.0
 kyuubi.ha.zookeeper.auth.digest|&lt;undefined&gt;|The digest auth string is used for zookeeper authentication, like: username:password.|string|1.3.2
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index e247367bb..56d8ceca9 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -51,6 +51,16 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
@@ -179,6 +189,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -192,6 +208,14 @@
                     <shadedArtifactAttached>false</shadedArtifactAttached>
                     <artifactSet>
                         <includes>
+                            <include>com.google.guava:*</include>
+                            <include>com.google.protobuf:*</include>
+                            <include>io.etcd:*</include>
+                            <include>io.grpc:*</include>
+                            <include>io.netty:*</include>
+                            <include>io.perfmark:perfmark-api</include>
+                            <include>io.vertx:*</include>
+                            <include>net.jodah:failsafe</include>
                             <include>org.apache.kyuubi:kyuubi-common_${scala.binary.version}</include>
                             <include>org.apache.kyuubi:kyuubi-events_${scala.binary.version}</include>
                             <include>org.apache.kyuubi:kyuubi-ha_${scala.binary.version}</include>
@@ -249,6 +273,41 @@
                                 <include>org.apache.thrift.**</include>
                             </includes>
                         </relocation>
+                        <relocation>
+                            <pattern>com.google</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.com.google</shadedPattern>
+                            <includes>
+                                <include>com.google.**</include>
+                            </includes>
+                        </relocation>
+                        <relocation>
+                            <pattern>io.netty</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.io.netty</shadedPattern>
+                            <includes>
+                                <include>io.netty.**</include>
+                            </includes>
+                        </relocation>
+                        <relocation>
+                            <pattern>io.vertx</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.io.vertx</shadedPattern>
+                            <includes>
+                                <include>io.vertx.**</include>
+                            </includes>
+                        </relocation>
+                        <relocation>
+                            <pattern>net.jodah</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.net.jodah</shadedPattern>
+                            <includes>
+                                <include>net.jodah.**</include>
+                            </includes>
+                        </relocation>
+                        <relocation>
+                            <pattern>io.perfmark</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.io.perfmark</shadedPattern>
+                            <includes>
+                                <include>io.perfmark.**</include>
+                            </includes>
+                        </relocation>
                     </relocations>
                 </configuration>
                 <executions>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
new file mode 100644
index 000000000..13c372b25
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.grpc.internal.PickFirstLoadBalancerProvider
+io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider
+io.grpc.grpclb.GrpclbLoadBalancerProvider
\ No newline at end of file
diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider
new file mode 100644
index 000000000..46b5718ff
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.etcd.jetcd.resolver.DnsSrvResolverProvider
+io.etcd.jetcd.resolver.HttpResolverProvider
+io.etcd.jetcd.resolver.HttpsResolverProvider
+io.etcd.jetcd.resolver.IPResolverProvider
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
new file mode 100644
index 000000000..46dc3b54c
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
+
+trait EtcdShareLevelSparkEngineSuite
+  extends ShareLevelSparkEngineTests with WithEtcdCluster {
+  override def withKyuubiConf: Map[String, String] = {
+    super.withKyuubiConf ++
+      etcdConf ++ Map(
+        ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+        ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+        ENGINE_CHECK_INTERVAL.key -> "PT5s")
+  }
+}
+
+class EtcdConnectionLevelSparkEngineSuite extends EtcdShareLevelSparkEngineSuite {
+  override def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class EtcdUserLevelSparkEngineSuite extends EtcdShareLevelSparkEngineSuite {
+  override def shareLevel: ShareLevel = ShareLevel.USER
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
similarity index 80%
rename from externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
rename to externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
index b75800d5f..c83139592 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
@@ -22,9 +22,6 @@ import java.util.UUID
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel.ShareLevel
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -34,15 +31,10 @@ import org.apache.kyuubi.service.ServiceState
  * This suite is to test some behavior with spark engine in different share level.
  * e.g. cleanup discovery service before stop.
  */
-abstract class ShareLevelSparkEngineSuite
+trait ShareLevelSparkEngineTests
   extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
   def shareLevel: ShareLevel
-  override def withKyuubiConf: Map[String, String] = {
-    super.withKyuubiConf ++ Map(
-      ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
-      ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
-      ENGINE_CHECK_INTERVAL.key -> "PT5s")
-  }
+
   override protected def jdbcUrl: String = getJdbcUrl
   override val namespace: String = {
     // for test, we always use uuid as namespace
@@ -85,11 +77,3 @@ abstract class ShareLevelSparkEngineSuite
     }
   }
 }
-
-class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
-  override def shareLevel: ShareLevel = ShareLevel.CONNECTION
-}
-
-class UserLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
-  override def shareLevel: ShareLevel = ShareLevel.USER
-}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
index 4a976c89d..7f2f06517 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
@@ -17,39 +17,16 @@
 
 package org.apache.kyuubi.engine.spark
 
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE, HA_ZK_AUTH_TYPE}
-import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
 import org.apache.kyuubi.ha.client.DiscoveryClient
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
 trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
-  private var zkServer: EmbeddedZookeeper = _
-  def namespace: String
-  override def withKyuubiConf: Map[String, String] = {
-    assert(zkServer != null)
-    Map(
-      HA_ADDRESSES.key -> zkServer.getConnectString,
-      HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
-      HA_NAMESPACE.key -> namespace)
-  }
 
-  override def beforeAll(): Unit = {
-    zkServer = new EmbeddedZookeeper()
-    val zkData = Utils.createTempDir()
-    val tmpConf = KyuubiConf()
-    tmpConf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
-    tmpConf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
-    zkServer.initialize(tmpConf)
-    zkServer.start()
-  }
+  def namespace: String
 
-  override def afterAll(): Unit = {
-    if (zkServer != null) {
-      zkServer.stop()
-    }
+  override def withKyuubiConf: Map[String, String] = {
+    Map(HA_NAMESPACE.key -> namespace)
   }
 
   override protected def beforeEach(): Unit = {
@@ -65,12 +42,4 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
   def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
     DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
   }
-
-  protected def getDiscoveryConnectionString: String = {
-    if (zkServer == null) {
-      ""
-    } else {
-      zkServer.getConnectString
-    }
-  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
similarity index 66%
copy from externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
copy to externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
index 4a976c89d..7f2d6b7a7 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
@@ -17,23 +17,21 @@
 
 package org.apache.kyuubi.engine.spark
 
+import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE, HA_ZK_AUTH_TYPE}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_ZK_AUTH_TYPE}
 import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
-trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
+trait WithEmbeddedZookeeper extends KyuubiFunSuite {
   private var zkServer: EmbeddedZookeeper = _
-  def namespace: String
-  override def withKyuubiConf: Map[String, String] = {
+
+  lazy val zookeeperConf: Map[String, String] = {
     assert(zkServer != null)
     Map(
       HA_ADDRESSES.key -> zkServer.getConnectString,
-      HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
-      HA_NAMESPACE.key -> namespace)
+      HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString)
   }
 
   override def beforeAll(): Unit = {
@@ -51,26 +49,4 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
       zkServer.stop()
     }
   }
-
-  override protected def beforeEach(): Unit = {
-    super.beforeEach()
-    startSparkEngine()
-  }
-
-  override protected def afterEach(): Unit = {
-    super.afterEach()
-    stopSparkEngine()
-  }
-
-  def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
-    DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
-  }
-
-  protected def getDiscoveryConnectionString: String = {
-    if (zkServer == null) {
-      ""
-    } else {
-      zkServer.getConnectString
-    }
-  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala
new file mode 100644
index 000000000..690f6c9cc
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient
+
+trait WithEtcdCluster extends KyuubiFunSuite {
+
+  private var etcdCluster: EtcdCluster = _
+
+  lazy val etcdConf: Map[String, String] = {
+    Map(
+      HA_ADDRESSES.key -> etcdCluster.clientEndpoints().asScala.mkString(","),
+      HA_CLIENT_CLASS.key -> classOf[EtcdDiscoveryClient].getName)
+  }
+
+  override def beforeAll(): Unit = {
+    etcdCluster = new Etcd.Builder().withNodes(1).build()
+    etcdCluster.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (etcdCluster != null) {
+      etcdCluster.close()
+    }
+    super.afterAll()
+  }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
new file mode 100644
index 000000000..4ef96e61a
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
+
+trait ZookeeperShareLevelSparkEngineSuite
+  extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
+  override def withKyuubiConf: Map[String, String] = {
+    super.withKyuubiConf ++
+      zookeeperConf ++ Map(
+        ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+        ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+        ENGINE_CHECK_INTERVAL.key -> "PT5s")
+  }
+}
+
+class ConnectionLevelSparkEngineSuite extends ZookeeperShareLevelSparkEngineSuite {
+  override def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class UserLevelSparkEngineSuite extends ZookeeperShareLevelSparkEngineSuite {
+  override def shareLevel: ShareLevel = ShareLevel.USER
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
index cd13a4b6d..8dc93759b 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
@@ -26,15 +26,18 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.sparkMajorMinorVersion
 import org.apache.kyuubi.engine.spark.WithDiscoverySparkSQLEngine
+import org.apache.kyuubi.engine.spark.WithEmbeddedZookeeper
 import org.apache.kyuubi.service.ServiceState
 
-abstract class SparkSQLEngineDeregisterSuite extends WithDiscoverySparkSQLEngine {
+abstract class SparkSQLEngineDeregisterSuite
+  extends WithDiscoverySparkSQLEngine with WithEmbeddedZookeeper {
   protected val maxJobFailures: Int = 2
 
   override def withKyuubiConf: Map[String, String] = {
-    super.withKyuubiConf ++ Map(
-      ANSI_ENABLED.key -> "true",
-      ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString)
+    super.withKyuubiConf ++
+      zookeeperConf ++ Map(
+        ANSI_ENABLED.key -> "true",
+        ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString)
   }
 
   override val namespace: String = s"/kyuubi/deregister_test/${UUID.randomUUID().toString}"
@@ -81,22 +84,24 @@ class SparkSQLEngineDeregisterMsgSuite extends SparkSQLEngineDeregisterSuite {
   }
 }
 
-class SparkSQLEngineDeregisterExceptionTTLSuite extends WithDiscoverySparkSQLEngine {
+class SparkSQLEngineDeregisterExceptionTTLSuite
+  extends WithDiscoverySparkSQLEngine with WithEmbeddedZookeeper {
   protected val maxJobFailures: Int = 2
   protected val deregisterExceptionTTL = 2000
 
   override def withKyuubiConf: Map[String, String] = {
-    super.withKyuubiConf ++ Map(
-      ANSI_ENABLED.key -> "true",
-      ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
-        sparkMajorMinorVersion match {
-          // see https://issues.apache.org/jira/browse/SPARK-35958
-          case (3, minor) if minor > 2 => "org.apache.spark.SparkArithmeticException"
-          case _ => classOf[ArithmeticException].getCanonicalName
-        }
-      },
-      ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString,
-      ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString)
+    super.withKyuubiConf ++
+      zookeeperConf ++ Map(
+        ANSI_ENABLED.key -> "true",
+        ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
+          sparkMajorMinorVersion match {
+            // see https://issues.apache.org/jira/browse/SPARK-35958
+            case (3, minor) if minor > 2 => "org.apache.spark.SparkArithmeticException"
+            case _ => classOf[ArithmeticException].getCanonicalName
+          }
+        },
+        ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString,
+        ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString)
   }
 
   override val namespace: String = s"/kyuubi/deregister_test/${UUID.randomUUID().toString}"
diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml
index 79750932b..4d4ea51aa 100644
--- a/kyuubi-ha/pom.xml
+++ b/kyuubi-ha/pom.xml
@@ -59,6 +59,42 @@
             <artifactId>guava</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-grpclb</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index c47a8ffd4..f38a45ce6 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -155,7 +155,7 @@ object HighAvailabilityConf {
       .checkValue(_ > 0, "Must be positive")
       .createWithDefault(Duration.ofSeconds(120).toMillis)
 
-  val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
+  val HA_ENGINE_REF_ID: OptionalConfigEntry[String] =
     buildConf("kyuubi.ha.engine.ref.id")
       .doc("The engine reference id will be attached to zookeeper node when engine started, " +
         "and the kyuubi server will check it cyclically.")
@@ -180,4 +180,13 @@ object HighAvailabilityConf {
       .version("1.5.0")
       .stringConf
       .createOptional
+
+  val HA_ETCD_LEASE_TIMEOUT: ConfigEntry[Long] =
+    buildConf("kyuubi.ha.etcd.lease.timeout")
+      .doc("Timeout for etcd keep alive lease. The kyuubi server will known " +
+        "unexpected loss of engine after up to this seconds.")
+      .version("1.6.0")
+      .timeConf
+      .checkValue(_ > 0, "Must be positive")
+      .createWithDefault(Duration.ofSeconds(10).toMillis)
 }
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
new file mode 100644
index 000000000..c9245a88e
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.etcd
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.TimeoutException
+
+import com.google.common.annotations.VisibleForTesting
+import io.etcd.jetcd.ByteSequence
+import io.etcd.jetcd.Client
+import io.etcd.jetcd.KV
+import io.etcd.jetcd.Lease
+import io.etcd.jetcd.Lock
+import io.etcd.jetcd.Watch
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse
+import io.etcd.jetcd.options.DeleteOption
+import io.etcd.jetcd.options.GetOption
+import io.etcd.jetcd.options.PutOption
+import io.etcd.jetcd.watch.WatchEvent
+import io.etcd.jetcd.watch.WatchResponse
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ENGINE_REF_ID
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._
+
+class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
+
+  case class ServiceNode(path: String, lease: Long)
+
+  var client: Client = _
+  var kvClient: KV = _
+  var lockClient: Lock = _
+  var leaseClient: Lease = _
+  var serviceNode: ServiceNode = _
+
+  var leaseTTL: Long = _
+
+  def createClient(): Unit = {
+    val endpoints = conf.get(HighAvailabilityConf.HA_ADDRESSES)
+    client = Client.builder.endpoints(endpoints).build
+    kvClient = client.getKVClient()
+    lockClient = client.getLockClient()
+    leaseClient = client.getLeaseClient()
+
+    leaseTTL = conf.get(HighAvailabilityConf.HA_ETCD_LEASE_TIMEOUT) / 1000
+  }
+
+  def closeClient(): Unit = {
+    if (client != null) {
+      client.close()
+    }
+  }
+
+  def create(path: String, mode: String, createParent: Boolean = true): String = {
+    // createParent can not effect here
+    mode match {
+      case "PERSISTENT" => kvClient.put(
+          ByteSequence.from(path.getBytes()),
+          ByteSequence.from(path.getBytes())).get()
+      case m => throw new KyuubiException(s"Create mode $m is not support in etcd!")
+    }
+    path
+  }
+
+  def getData(path: String): Array[Byte] = {
+    val response = kvClient.get(ByteSequence.from(path.getBytes())).get()
+    if (response.getKvs.isEmpty) {
+      throw new KyuubiException(s"Key[$path] not exists in ETCD, please check it.")
+    } else {
+      response.getKvs.get(0).getValue.getBytes
+    }
+  }
+
+  def getChildren(path: String): List[String] = {
+    val kvs = kvClient.get(
+      ByteSequence.from(path.getBytes()),
+      GetOption.newBuilder().isPrefix(true).build()).get().getKvs
+    if (kvs.isEmpty) {
+      List.empty
+    } else {
+      kvs.asScala.map(kv => kv.getKey.toString(UTF_8).stripPrefix(path).stripPrefix("/"))
+        .filter(key => key.nonEmpty && !key.startsWith("lock")).toList
+    }
+  }
+
+  def pathExists(path: String): Boolean = {
+    !pathNonExists(path)
+  }
+
+  def pathNonExists(path: String): Boolean = {
+    kvClient.get(ByteSequence.from(path.getBytes())).get().getKvs.isEmpty
+  }
+
+  def delete(path: String, deleteChildren: Boolean = false): Unit = {
+    kvClient.delete(
+      ByteSequence.from(path.getBytes()),
+      DeleteOption.newBuilder().isPrefix(deleteChildren).build()).get()
+  }
+
+  def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+    // not need with etcd
+  }
+
+  def tryWithLock[T](
+      lockPath: String,
+      timeout: Long)(f: => T): T = {
+    // the default unit is millis, covert to seconds.
+    // add more 3 second for leaseTime to make client fast fail
+    val leaseTime = timeout / 1000 + 3
+    // if the lease expires, the lock is automatically released.
+    val leaseId = leaseClient.grant(leaseTime).get().getID()
+    try {
+      try {
+        // Acquire a lease. If no leases are available, this method blocks until either the
+        // maximum number of leases is increased or another client/process closes a lease
+
+        // will throw TimeoutException when we are get lock timeout
+        lockClient.lock(ByteSequence.from(lockPath.getBytes()), leaseId)
+          .get(timeout, TimeUnit.MILLISECONDS)
+      } catch {
+        case _: TimeoutException =>
+          throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after " +
+            s"$timeout ms. There would be some problem that other session may " +
+            s"create engine timeout.")
+        case e: Exception =>
+          throw new KyuubiException(s"Lock failed on path [$lockPath]", e)
+      }
+      f
+    } finally {
+      try {
+        lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get()
+        leaseClient.revoke(leaseId).get()
+      } catch {
+        case e: Exception => throw new KyuubiException(e.getMessage, e.getCause)
+      }
+    }
+  }
+
+  def getServerHost(namespace: String): Option[(String, Int)] = {
+    // TODO: use last one because to avoid touching some maybe-crashed engines
+    // We need a big improvement here.
+    getServiceNodesInfo(namespace, Some(1), silent = true) match {
+      case Seq(sn) => Some((sn.host, sn.port))
+      case _ => None
+    }
+  }
+
+  def getEngineByRefId(
+      namespace: String,
+      engineRefId: String): Option[(String, Int)] = {
+    getServiceNodesInfo(namespace, silent = true)
+      .find(_.engineRefId.exists(_.equals(engineRefId)))
+      .map(data => (data.host, data.port))
+  }
+
+  def getServiceNodesInfo(
+      namespace: String,
+      sizeOpt: Option[Int] = None,
+      silent: Boolean = false): Seq[ServiceNodeInfo] = {
+    try {
+      val hosts = getChildren(DiscoveryPaths.makePath(null, namespace))
+      val size = sizeOpt.getOrElse(hosts.size)
+      hosts.takeRight(size).map { p =>
+        val path = DiscoveryPaths.makePath(namespace, p)
+        val instance = new String(getData(path), UTF_8)
+        val (host, port) = DiscoveryClient.parseInstanceHostPort(instance)
+        val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
+        val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
+        info(s"Get service instance:$instance and version:$version under $namespace")
+        ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
+      }
+    } catch {
+      case _: Exception if silent => Nil
+      case e: Exception =>
+        error(s"Failed to get service node info", e)
+        Nil
+    }
+  }
+
+  def registerService(
+      conf: KyuubiConf,
+      namespace: String,
+      serviceDiscovery: ServiceDiscovery,
+      version: Option[String] = None,
+      external: Boolean = false): Unit = {
+    val instance = serviceDiscovery.fe.connectionUrl
+    val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+
+    val serviceNode = createPersistentNode(conf, namespace, instance, version, external)
+
+    client.getWatchClient.watch(ByteSequence.from(serviceNode.path.getBytes()), watcher)
+
+    if (pathNonExists(serviceNode.path)) {
+      // No node exists, throw exception
+      throw new KyuubiException(s"Unable to create keyValue for this Kyuubi " +
+        s"instance[${instance}] on ETCD.")
+    }
+  }
+
+  def deregisterService(): Unit = {
+    // close the EPHEMERAL_SEQUENTIAL node in etcd
+    if (serviceNode != null) {
+      if (serviceNode.lease != LEASE_NULL_VALUE) {
+        client.getLeaseClient.revoke(serviceNode.lease)
+        delete(serviceNode.path)
+      }
+      serviceNode = null
+    }
+  }
+
+  def postDeregisterService(namespace: String): Boolean = {
+    if (namespace != null) {
+      delete(DiscoveryPaths.makePath(null, namespace), true)
+      true
+    } else {
+      false
+    }
+  }
+
+  def createAndGetServiceNode(
+      conf: KyuubiConf,
+      namespace: String,
+      instance: String,
+      version: Option[String] = None,
+      external: Boolean = false): String = {
+    createPersistentNode(conf, namespace, instance, version, external).path
+  }
+
+  @VisibleForTesting
+  def startSecretNode(
+      createMode: String,
+      basePath: String,
+      initData: String,
+      useProtection: Boolean = false): Unit = {
+    client.getKVClient.put(
+      ByteSequence.from(basePath.getBytes()),
+      ByteSequence.from(initData.getBytes())).get()
+  }
+
+  private def createPersistentNode(
+      conf: KyuubiConf,
+      namespace: String,
+      instance: String,
+      version: Option[String] = None,
+      external: Boolean = false): ServiceNode = {
+    val ns = DiscoveryPaths.makePath(null, namespace)
+    create(ns, "PERSISTENT")
+
+    val session = conf.get(HA_ENGINE_REF_ID)
+      .map(refId => s"refId=$refId;").getOrElse("")
+    val pathPrefix = DiscoveryPaths.makePath(
+      namespace,
+      s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
+    val znode = instance
+
+    var leaseId: Long = LEASE_NULL_VALUE
+    var realPath: String = null
+    // Use the same of engine init timeout
+    val timeout = conf.get(ENGINE_INIT_TIMEOUT)
+    // lock to get instance sequence
+    tryWithLock(s"$ns$LOCK_PATH_SUFFIX", timeout) {
+      val instances = getChildren(pathPrefix).map(_.stripPrefix(pathPrefix).toLong)
+      val sequence: Long = if (instances.isEmpty) 0 else instances.max + 1
+      realPath = s"$pathPrefix${"%010d".format(sequence)}"
+
+      if (external) {
+        client.getKVClient.put(
+          ByteSequence.from(realPath.getBytes()),
+          ByteSequence.from(znode.getBytes())).get()
+      } else {
+        leaseId = client.getLeaseClient.grant(leaseTTL).get().getID
+        client.getLeaseClient.keepAlive(
+          leaseId,
+          new StreamObserver[LeaseKeepAliveResponse] {
+            override def onNext(v: LeaseKeepAliveResponse): Unit = Unit // do nothing
+
+            override def onError(throwable: Throwable): Unit = Unit // do nothing
+
+            override def onCompleted(): Unit = Unit // do nothing
+          })
+        client.getKVClient.put(
+          ByteSequence.from(realPath.getBytes()),
+          ByteSequence.from(znode.getBytes()),
+          PutOption.newBuilder().withLeaseId(leaseId).build()).get()
+      }
+    }
+    ServiceNode(realPath, leaseId)
+  }
+
+  class DeRegisterWatcher(instance: String, serviceDiscovery: ServiceDiscovery)
+    extends Watch.Listener {
+
+    override def onNext(watchResponse: WatchResponse): Unit = {
+      watchResponse.getEvents.asScala
+        .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => {
+          warn(s"This Kyuubi instance ${instance} is now de-registered from" +
+            s" ETCD. The server will be shut down after the last client session completes.")
+          serviceDiscovery.stopGracefully()
+        })
+    }
+
+    override def onError(throwable: Throwable): Unit =
+      throw new KyuubiException(throwable.getMessage, throwable.getCause)
+
+    override def onCompleted(): Unit = Unit
+  }
+}
+
+object EtcdDiscoveryClient {
+  final private val LEASE_NULL_VALUE: Long = -1
+  final private[etcd] val LOCK_PATH_SUFFIX = "/lock"
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
index 40ead4633..467c323b7 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
@@ -42,9 +42,9 @@ class ZookeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
       nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
     }
 
-    if (conf.get(HighAvailabilityConf.HA_ZK_ENGINE_REF_ID).isEmpty && enabledServerAcls()) {
+    if (conf.get(HighAvailabilityConf.HA_ENGINE_REF_ID).isEmpty && enabledServerAcls()) {
       addACL()
-    } else if (conf.get(HighAvailabilityConf.HA_ZK_ENGINE_REF_ID).nonEmpty && enabledEngineAcls()) {
+    } else if (conf.get(HighAvailabilityConf.HA_ENGINE_REF_ID).nonEmpty && enabledEngineAcls()) {
       addACL()
     } else {
       // ACLs for znodes on a non-kerberized cluster
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
index 5ea121e79..0cfc3af4a 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
@@ -119,10 +119,10 @@ object ZookeeperClientProvider extends Logging {
       }
     }
 
-    if (conf.get(HA_ZK_ENGINE_REF_ID).isEmpty
+    if (conf.get(HA_ENGINE_REF_ID).isEmpty
       && AuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == AuthTypes.KERBEROS) {
       setupZkAuth()
-    } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && AuthTypes
+    } else if (conf.get(HA_ENGINE_REF_ID).nonEmpty && AuthTypes
         .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
       setupZkAuth()
     }
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index d1ce32685..5b6beaa5d 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -46,7 +46,7 @@ import org.apache.kyuubi.KyuubiException
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ENGINE_REF_ID
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
 import org.apache.kyuubi.ha.client.DiscoveryClient
@@ -344,7 +344,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
         throw new KyuubiException(s"Failed to create namespace '$ns'", e)
     }
 
-    val session = conf.get(HA_ZK_ENGINE_REF_ID)
+    val session = conf.get(HA_ENGINE_REF_ID)
       .map(refId => s"refId=$refId;").getOrElse("")
     val pathPrefix = ZKPaths.makePath(
       namespace,
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
new file mode 100644
index 000000000..41bc92f64
--- /dev/null
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_AUTH_TYPE
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.service.NoopTBinaryFrontendServer
+import org.apache.kyuubi.service.NoopTBinaryFrontendService
+import org.apache.kyuubi.service.Serverable
+import org.apache.kyuubi.service.Service
+import org.apache.kyuubi.service.ServiceState
+
+trait DiscoveryClientTests extends KyuubiFunSuite {
+  protected val conf: KyuubiConf
+
+  protected def getConnectString(): String
+
+  test("publish instance to embedded zookeeper server") {
+    val namespace = "kyuubiserver"
+
+    conf
+      .unset(KyuubiConf.SERVER_KEYTAB)
+      .unset(KyuubiConf.SERVER_PRINCIPAL)
+      .set(HA_ADDRESSES, getConnectString)
+      .set(HA_NAMESPACE, namespace)
+      .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+
+    var serviceDiscovery: KyuubiServiceDiscovery = null
+    val server: Serverable = new NoopTBinaryFrontendServer() {
+      override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+        new NoopTBinaryFrontendService(this) {
+          override val discoveryService: Option[Service] = {
+            serviceDiscovery = new KyuubiServiceDiscovery(this)
+            Some(serviceDiscovery)
+          }
+        })
+    }
+    server.initialize(conf)
+    server.start()
+    val znodeRoot = s"/$namespace"
+    withDiscoveryClient(conf) { framework =>
+      try {
+        assert(framework.pathNonExists("/abc"))
+        assert(framework.pathExists(znodeRoot))
+        val children = framework.getChildren(znodeRoot)
+        assert(children.head ===
+          s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+          s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+        children.foreach { child =>
+          framework.delete(s"""$znodeRoot/$child""")
+        }
+        eventually(timeout(5.seconds), interval(100.millis)) {
+          assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+          assert(server.getServiceState === ServiceState.STOPPED)
+        }
+      } finally {
+        server.stop()
+      }
+    }
+  }
+
+  test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
+    val logAppender = new LogAppender("test stop engine gracefully")
+    withLogAppender(logAppender) {
+      val namespace = "kyuubiengine"
+
+      conf
+        .unset(KyuubiConf.SERVER_KEYTAB)
+        .unset(KyuubiConf.SERVER_PRINCIPAL)
+        .set(HA_ADDRESSES, getConnectString())
+        .set(HA_NAMESPACE, namespace)
+        .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+        .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
+
+      var serviceDiscovery: KyuubiServiceDiscovery = null
+      val server: Serverable = new NoopTBinaryFrontendServer() {
+        override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+          new NoopTBinaryFrontendService(this) {
+            override val discoveryService: Option[Service] = {
+              serviceDiscovery = new KyuubiServiceDiscovery(this)
+              Some(serviceDiscovery)
+            }
+          })
+      }
+      server.initialize(conf)
+      server.start()
+
+      val znodeRoot = s"/$namespace"
+      withDiscoveryClient(conf) { framework =>
+        try {
+
+          assert(framework.pathNonExists("/abc"))
+          assert(framework.pathExists(znodeRoot))
+          val children = framework.getChildren(znodeRoot)
+          assert(children.head ===
+            s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+            s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+          children.foreach { child =>
+            framework.delete(s"""$znodeRoot/$child""")
+          }
+          eventually(timeout(5.seconds), interval(100.millis)) {
+            assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+            assert(server.getServiceState === ServiceState.STOPPED)
+            val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
+              s" is now de-registered"
+            assert(logAppender.loggingEvents.exists(
+              _.getMessage.getFormattedMessage.contains(msg)))
+          }
+        } finally {
+          server.stop()
+          serviceDiscovery.stop()
+        }
+      }
+    }
+  }
+
+  test("parse host and port from instance string") {
+    val host = "127.0.0.1"
+    val port = 10009
+    val instance1 = s"$host:$port"
+    val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
+    assert(host === host1)
+    assert(port === port1)
+
+    val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
+      s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
+      s"hive.server2.thrift.port=$port;" +
+      s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
+    val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
+    assert(host === host2)
+    assert(port === port2)
+  }
+}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
new file mode 100644
index 000000000..b0de297dc
--- /dev/null
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.etcd
+
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientTests
+import org.apache.kyuubi.service.NoopTBinaryFrontendServer
+
+class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
+  private var etcdCluster: EtcdCluster = _
+  var engineServer: NoopTBinaryFrontendServer = _
+
+  private lazy val _connectString = etcdCluster.clientEndpoints().asScala.mkString(",")
+
+  override def getConnectString(): String = _connectString
+
+  val conf: KyuubiConf = {
+    KyuubiConf()
+      .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+  }
+
+  override def beforeAll(): Unit = {
+    etcdCluster = new Etcd.Builder().withNodes(1).build()
+    etcdCluster.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (etcdCluster != null) {
+      etcdCluster.close()
+    }
+    super.afterAll()
+  }
+
+  test("etcd test: set, get and delete") {
+    withDiscoveryClient(conf) { discoveryClient =>
+      val path = "/kyuubi"
+      // set
+      discoveryClient.create(path, "PERSISTENT")
+      assert(discoveryClient.pathExists(path))
+
+      // get
+      assert(new String(discoveryClient.getData(path), StandardCharsets.UTF_8) == path)
+
+      // delete
+      discoveryClient.delete(path)
+      assert(!discoveryClient.pathExists(path))
+    }
+  }
+}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index afa744324..8b87d4aad 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -30,22 +30,21 @@ import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.ACL
 import org.scalatest.time.SpanSugar._
 
-import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
+import org.apache.kyuubi.KerberizedTestHelper
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientTests
 import org.apache.kyuubi.ha.client.EngineServiceDiscovery
-import org.apache.kyuubi.ha.client.KyuubiServiceDiscovery
 import org.apache.kyuubi.service._
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
-class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
-  import DiscoveryClientProvider._
+class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with KerberizedTestHelper {
 
   val zkServer = new EmbeddedZookeeper()
-  val conf: KyuubiConf = KyuubiConf()
+  override val conf: KyuubiConf = KyuubiConf()
+
+  override def getConnectString(): String = zkServer.getConnectString
 
   override def beforeAll(): Unit = {
     conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
@@ -62,51 +61,6 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
     super.afterAll()
   }
 
-  test("publish instance to embedded zookeeper server") {
-    val namespace = "kyuubiserver"
-
-    conf
-      .unset(KyuubiConf.SERVER_KEYTAB)
-      .unset(KyuubiConf.SERVER_PRINCIPAL)
-      .set(HA_ADDRESSES, zkServer.getConnectString)
-      .set(HA_NAMESPACE, namespace)
-      .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-
-    var serviceDiscovery: KyuubiServiceDiscovery = null
-    val server: Serverable = new NoopTBinaryFrontendServer() {
-      override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
-        new NoopTBinaryFrontendService(this) {
-          override val discoveryService: Option[Service] = {
-            serviceDiscovery = new KyuubiServiceDiscovery(this)
-            Some(serviceDiscovery)
-          }
-        })
-    }
-    server.initialize(conf)
-    server.start()
-    val znodeRoot = s"/$namespace"
-    withDiscoveryClient(conf) { framework =>
-      try {
-        assert(framework.pathNonExists("/abc"))
-        assert(framework.pathExists(znodeRoot))
-        val children = framework.getChildren(znodeRoot)
-        assert(children.head ===
-          s"serviceUri=${server.frontendServices.head.connectionUrl};" +
-          s"version=$KYUUBI_VERSION;sequence=0000000000")
-
-        children.foreach { child =>
-          framework.delete(s"""$znodeRoot/$child""")
-        }
-        eventually(timeout(5.seconds), interval(100.millis)) {
-          assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
-          assert(server.getServiceState === ServiceState.STOPPED)
-        }
-      } finally {
-        server.stop()
-      }
-    }
-  }
-
   test("acl for zookeeper") {
     val expectedNoACL = new util.ArrayList[ACL](ZooDefs.Ids.OPEN_ACL_UNSAFE)
     val expectedEnableACL = new util.ArrayList[ACL](ZooDefs.Ids.READ_ACL_UNSAFE)
@@ -124,12 +78,12 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
     val serverACL = new ZookeeperACLProvider(serverConf).getDefaultAcl
     assertACL(expectedEnableACL, serverACL)
 
-    val engineConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
+    val engineConf = serverConf.clone.set(HA_ENGINE_REF_ID, "ref")
     engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.NONE.toString)
     val engineACL = new ZookeeperACLProvider(engineConf).getDefaultAcl
     assertACL(expectedNoACL, engineACL)
 
-    val enableEngineACLConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
+    val enableEngineACLConf = serverConf.clone.set(HA_ENGINE_REF_ID, "ref")
     enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.KERBEROS.toString)
     val enableEngineACL = new ZookeeperACLProvider(enableEngineACLConf).getDefaultAcl
     assertACL(expectedEnableACL, enableEngineACL)
@@ -161,79 +115,6 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
     }
   }
 
-  test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
-    val logAppender = new LogAppender("test stop engine gracefully")
-    withLogAppender(logAppender) {
-      val namespace = "kyuubiengine"
-
-      conf
-        .unset(KyuubiConf.SERVER_KEYTAB)
-        .unset(KyuubiConf.SERVER_PRINCIPAL)
-        .set(HA_ADDRESSES, zkServer.getConnectString)
-        .set(HA_NAMESPACE, namespace)
-        .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-        .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
-
-      var serviceDiscovery: KyuubiServiceDiscovery = null
-      val server: Serverable = new NoopTBinaryFrontendServer() {
-        override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
-          new NoopTBinaryFrontendService(this) {
-            override val discoveryService: Option[Service] = {
-              serviceDiscovery = new KyuubiServiceDiscovery(this)
-              Some(serviceDiscovery)
-            }
-          })
-      }
-      server.initialize(conf)
-      server.start()
-
-      val znodeRoot = s"/$namespace"
-      withDiscoveryClient(conf) { framework =>
-        try {
-
-          assert(framework.pathNonExists("/abc"))
-          assert(framework.pathExists(znodeRoot))
-          val children = framework.getChildren(znodeRoot)
-          assert(children.head ===
-            s"serviceUri=${server.frontendServices.head.connectionUrl};" +
-            s"version=$KYUUBI_VERSION;sequence=0000000000")
-
-          children.foreach { child =>
-            framework.delete(s"""$znodeRoot/$child""")
-          }
-          eventually(timeout(5.seconds), interval(100.millis)) {
-            assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
-            assert(server.getServiceState === ServiceState.STOPPED)
-            val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
-              s" is now de-registered"
-            assert(logAppender.loggingEvents.exists(
-              _.getMessage.getFormattedMessage.contains(msg)))
-          }
-        } finally {
-          server.stop()
-          serviceDiscovery.stop()
-        }
-      }
-    }
-  }
-
-  test("parse host and port from instance string") {
-    val host = "127.0.0.1"
-    val port = 10009
-    val instance1 = s"$host:$port"
-    val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
-    assert(host === host1)
-    assert(port === port1)
-
-    val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
-      s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
-      s"hive.server2.thrift.port=$port;" +
-      s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
-    val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
-    assert(host === host2)
-    assert(port === port2)
-  }
-
   test("stop engine in time while zk ensemble terminates") {
     val zkServer = new EmbeddedZookeeper()
     val conf = KyuubiConf()
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 9c04d2294..c5013db44 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -277,6 +277,16 @@
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-spark-sql-engine_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.grpc</groupId>
+                    <artifactId>grpc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.grpc</groupId>
+                    <artifactId>grpc-protobuf</artifactId>
+                </exclusion>
+            </exclusions>
             <scope>test</scope>
         </dependency>
 
@@ -461,6 +471,12 @@
             <artifactId>scala-compiler</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 34acd5889..6cbd1ff94 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
 import org.apache.kyuubi.engine.hive.HiveProcessBuilder
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_NAMESPACE, HA_ZK_ENGINE_REF_ID}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
 import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
@@ -167,7 +167,7 @@ private[kyuubi] class EngineRef(
     if (engineRef.nonEmpty) return engineRef.get
 
     conf.set(HA_NAMESPACE, engineSpace)
-    conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
+    conf.set(HA_ENGINE_REF_ID, engineRefId)
     val started = System.currentTimeMillis()
     conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
     builder = engineType match {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
similarity index 81%
rename from kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
rename to kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
index 096ccc656..538a36805 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
@@ -23,32 +23,28 @@ import java.util.concurrent.Executors
 import org.apache.hadoop.security.UserGroupInformation
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, Utils}
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.EngineType._
+import org.apache.kyuubi.engine.ShareLevel._
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.ha.client.DiscoveryPaths
 import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.util.NamedThreadFactory
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-
-class EngineRefSuite extends KyuubiFunSuite {
-  import EngineType._
-  import ShareLevel._
-  private val zkServer = new EmbeddedZookeeper
-  private val conf = KyuubiConf()
-  private val user = Utils.currentUser
+
+trait EngineRefTests extends KyuubiFunSuite {
+
+  protected val user = Utils.currentUser
   private val metricsSystem = new MetricsSystem
 
+  protected val conf: KyuubiConf
+  protected def getConnectString(): String
+
   override def beforeAll(): Unit = {
-    val zkData = Utils.createTempDir()
-    conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
-      .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
-      .set("spark.sql.catalogImplementation", "in-memory")
-    zkServer.initialize(conf)
-    zkServer.start()
     metricsSystem.initialize(conf)
     metricsSystem.start()
     super.beforeAll()
@@ -56,7 +52,6 @@ class EngineRefSuite extends KyuubiFunSuite {
 
   override def afterAll(): Unit = {
     metricsSystem.stop()
-    zkServer.stop()
     super.afterAll()
   }
 
@@ -222,7 +217,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
     conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
-    conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
+    conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
     val engine = new EngineRef(conf, user, id, null)
 
     var port1 = 0
@@ -257,53 +252,6 @@ class EngineRefSuite extends KyuubiFunSuite {
     }
   }
 
-  // KYUUBI #2827 remove all engines dependencies except to spark from server
-  ignore("different engine type should use its own lock") {
-    conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
-    conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-    conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
-    conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test1")
-    conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
-    val conf1 = conf.clone
-    conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
-    val conf2 = conf.clone
-    conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
-
-    val start = System.currentTimeMillis()
-    val times = new Array[Long](2)
-    val executor = Executors.newFixedThreadPool(2)
-    try {
-      executor.execute(() => {
-        DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
-          try {
-            new EngineRef(conf1, user, UUID.randomUUID().toString, null)
-              .getOrCreate(client)
-          } finally {
-            times(0) = System.currentTimeMillis()
-          }
-        }
-      })
-      executor.execute(() => {
-        DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
-          try {
-            new EngineRef(conf2, user, UUID.randomUUID().toString, null)
-              .getOrCreate(client)
-          } finally {
-            times(1) = System.currentTimeMillis()
-          }
-        }
-      })
-
-      eventually(timeout(10.seconds), interval(200.milliseconds)) {
-        assert(times.forall(_ > start))
-        // ENGINE_INIT_TIMEOUT is 3000ms
-        assert(times.max - times.min < 2500)
-      }
-    } finally {
-      executor.shutdown()
-    }
-  }
-
   test("three same lock request with initialization timeout") {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
@@ -311,7 +259,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
     conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test2")
-    conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
+    conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
 
     val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
     val start = System.currentTimeMillis()
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala
new file mode 100644
index 000000000..80bb09e20
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient
+
+class EngineRefWithEtcdSuite extends EngineRefTests {
+
+  private var etcdCluster: EtcdCluster = _
+  private lazy val _connectString: String = etcdCluster.clientEndpoints().asScala.mkString(",")
+
+  override protected val conf: KyuubiConf = {
+    KyuubiConf()
+      .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+      .set("spark.sql.catalogImplementation", "in-memory")
+  }
+
+  override protected def getConnectString(): String = _connectString
+
+  override def beforeAll(): Unit = {
+    etcdCluster = new Etcd.Builder().withNodes(1).build()
+    etcdCluster.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (etcdCluster != null) {
+      etcdCluster.close()
+    }
+    super.afterAll()
+  }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala
new file mode 100644
index 000000000..75dd5cc09
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.util.UUID
+import java.util.concurrent.Executors
+
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.EngineType.HIVE_SQL
+import org.apache.kyuubi.engine.EngineType.SPARK_SQL
+import org.apache.kyuubi.engine.ShareLevel.USER
+import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf
+
+class EngineRefWithZookeeperSuite extends EngineRefTests {
+
+  private val zkServer = new EmbeddedZookeeper
+  val conf = KyuubiConf()
+
+  override def beforeAll(): Unit = {
+    val zkData = Utils.createTempDir()
+    conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
+      .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
+      .set("spark.sql.catalogImplementation", "in-memory")
+    zkServer.initialize(conf)
+    zkServer.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    zkServer.stop()
+    super.afterAll()
+  }
+
+  def getConnectString(): String = zkServer.getConnectString
+
+  // TODO mvoe to EngineRefTests when etcd discovery support more engines
+  // KYUUBI #2827 remove all engines dependencies except to spark from server
+  ignore("different engine type should use its own lock") {
+    conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+    conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+    conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+    conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test1")
+    conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
+    val conf1 = conf.clone
+    conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+    val conf2 = conf.clone
+    conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
+
+    val start = System.currentTimeMillis()
+    val times = new Array[Long](2)
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      executor.execute(() => {
+        DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
+          try {
+            new EngineRef(conf1, user, UUID.randomUUID().toString, null)
+              .getOrCreate(client)
+          } finally {
+            times(0) = System.currentTimeMillis()
+          }
+        }
+      })
+      executor.execute(() => {
+        DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
+          try {
+            new EngineRef(conf2, user, UUID.randomUUID().toString, null)
+              .getOrCreate(client)
+          } finally {
+            times(1) = System.currentTimeMillis()
+          }
+        }
+      })
+
+      eventually(timeout(10.seconds), interval(200.milliseconds)) {
+        assert(times.forall(_ > start))
+        // ENGINE_INIT_TIMEOUT is 3000ms
+        assert(times.max - times.min < 2500)
+      }
+    } finally {
+      executor.shutdown()
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 40b38bc7d..155141868 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
         <commons-lang.version>2.6</commons-lang.version>
         <commons-lang3.version>3.10</commons-lang3.version>
         <curator.version>2.12.0</curator.version>
+        <etcd.version>0.7.1</etcd.version>
         <delta.version>2.0.0rc1</delta.version>
         <fb303.version>0.9.3</fb303.version>
         <flink.version>1.14.5</flink.version>
@@ -119,6 +120,7 @@
         <flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
         <flink.archive.download.skip>false</flink.archive.download.skip>
         <google.jsr305.version>3.0.2</google.jsr305.version>
+        <grpc.verion>1.47.0</grpc.verion>
         <gson.version>2.8.9</gson.version>
         <guava.version>30.1-jre</guava.version>
         <guava.failureaccess.version>1.0.1</guava.failureaccess.version>
@@ -1023,6 +1025,54 @@
                 </exclusions>
             </dependency>
 
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-core</artifactId>
+                <version>${etcd.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>javax.annotation</groupId>
+                        <artifactId>javax.annotation-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-launcher</artifactId>
+                <version>${etcd.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-core</artifactId>
+                <version>${grpc.verion}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-grpclb</artifactId>
+                <version>${grpc.verion}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-netty</artifactId>
+                <version>${grpc.verion}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-protobuf</artifactId>
+                <version>${grpc.verion}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-stub</artifactId>
+                <version>${grpc.verion}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>${iceberg.name}</artifactId>