You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/13 01:32:25 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c24a192dc [KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test
c24a192dc is described below

commit c24a192dcb0b3c1490ee337e5f0bd37ee81f3679
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Sep 13 09:32:16 2022 +0800

    [KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test
    
    ### _Why are the changes needed?_
    
    This PR proposes to add Testcontainers based Zookeeper 3.4/3.5/3.6/3.7 integration test, we need it to verify the compatibility if we upgrade Zookeeper client to the new version. See details in #1941
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2040 from pan3793/zk-3-4-it.
    
    Closes #2040
    
    929540f6 [Cheng Pan] [TEST] Add Zookeeper 3.4/3.5/3.6 integration test
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .github/workflows/master.yml                       |  34 +++++
 .../kyuubi-zookeeper-it}/pom.xml                   |  85 ++++---------
 .../DockerizedZkServiceDiscoverySuite.scala        |  46 +++++++
 .../src/test/resources/log4j2-test.xml             |  43 +++++++
 integration-tests/pom.xml                          |   1 +
 kyuubi-ha/pom.xml                                  |  16 +++
 .../apache/kyuubi/ha/client/DiscoveryClient.scala  |   8 +-
 ...viderSuite.scala => DiscoveryClientSuite.scala} |  27 ++--
 .../kyuubi/ha/client/DiscoveryClientTests.scala    | 140 ++++++++++-----------
 .../ha/client/etcd/EtcdDiscoveryClientSuite.scala  |  14 ++-
 .../zookeeper/ZookeeperDiscoveryClientSuite.scala  | 118 ++++++++---------
 11 files changed, 320 insertions(+), 212 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 0c815b2e8..bccfec36c 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -404,3 +404,37 @@ jobs:
           path: |
             **/target/unit-tests.log
             **/kyuubi-spark-sql-engine.log*
+
+  zookeeper-it:
+    name: Zookeeper Integration Test
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      matrix:
+        java: [ 8 ]
+        zookeeper: ["3.4", "3.5", "3.6", "3.7" ]
+        comment: [ "normal" ]
+    steps:
+      - uses: actions/checkout@v2
+      - name: Tune Runner VM
+        uses: ./.github/actions/tune-runner-vm
+      - name: Setup JDK ${{ matrix.java }}
+        uses: actions/setup-java@v2
+        with:
+          distribution: zulu
+          java-version: ${{ matrix.java }}
+          cache: 'maven'
+          check-latest: false
+      - name: zookeeper integration tests
+        run: |
+          export KYUUBI_IT_ZOOKEEPER_VERSION=${{ matrix.zookeeper }}
+          TEST_MODULES="integration-tests/kyuubi-zookeeper-it"
+          ./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -am clean install -DskipTests
+          ./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} test
+      - name: Upload test logs
+        if: failure()
+        uses: actions/upload-artifact@v2
+        with:
+          name: unit-tests-log-java-${{ matrix.java }}-zookeeper-${{ matrix.comment }}
+          path: |
+            **/target/unit-tests.log
diff --git a/kyuubi-ha/pom.xml b/integration-tests/kyuubi-zookeeper-it/pom.xml
similarity index 57%
copy from kyuubi-ha/pom.xml
copy to integration-tests/kyuubi-zookeeper-it/pom.xml
index e7b89183f..19882efdb 100644
--- a/kyuubi-ha/pom.xml
+++ b/integration-tests/kyuubi-zookeeper-it/pom.xml
@@ -7,7 +7,7 @@
   ~ (the "License"); you may not use this file except in compliance with
   ~ the License.  You may obtain a copy of the License at
   ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~     http://www.apache.org/licenses/LICENSE-2.0
   ~
   ~ Unless required by applicable law or agreed to in writing, software
   ~ distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,84 +20,35 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
+        <artifactId>integration-tests</artifactId>
         <groupId>org.apache.kyuubi</groupId>
-        <artifactId>kyuubi-parent</artifactId>
         <version>1.7.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>kyuubi-ha_2.12</artifactId>
-    <name>Kyuubi Project High Availability</name>
-    <packaging>jar</packaging>
+    <artifactId>kyuubi-zookeeper-it_2.12</artifactId>
+    <name>Kyuubi Test Zookeeper IT</name>
     <url>https://kyuubi.apache.org/</url>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
-            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.etcd</groupId>
-            <artifactId>jetcd-core</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-core</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-grpclb</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-netty</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-protobuf</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-stub</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.etcd</groupId>
-            <artifactId>jetcd-launcher</artifactId>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
-            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -110,15 +61,21 @@
         </dependency>
 
         <dependency>
-            <groupId>com.unboundid</groupId>
-            <artifactId>unboundid-ldapsdk</artifactId>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.kyuubi</groupId>
-            <artifactId>kyuubi-zookeeper_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
+            <groupId>com.dimafeng</groupId>
+            <artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala b/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala
new file mode 100644
index 000000000..87eecf578
--- /dev/null
+++ b/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.zookeeper
+
+import com.dimafeng.testcontainers.{GenericContainer, SingleContainer}
+import org.testcontainers.containers.wait.strategy.Wait
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClientSuite
+
+class DockerizedZkServiceDiscoverySuite extends ZookeeperDiscoveryClientSuite {
+
+  private val zkClientPort = 2181
+  private val zkVersion = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_VERSION", "3.4")
+  private val zkImage = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_IMAGE", s"zookeeper:$zkVersion")
+
+  val container: SingleContainer[_] = GenericContainer(
+    dockerImage = zkImage,
+    exposedPorts = Seq(zkClientPort),
+    waitStrategy = Wait.forListeningPort)
+
+  override def getConnectString: String = s"${container.host}:${container.mappedPort(zkClientPort)}"
+
+  override def startZk(): Unit = synchronized {
+    container.start()
+  }
+
+  override def stopZk(): Unit = synchronized {
+    Utils.tryLogNonFatalError { container.stop() }
+  }
+}
diff --git a/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml b/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml
new file mode 100644
index 000000000..bfc40dd6d
--- /dev/null
+++ b/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!-- Extra logging related to initialization of Log4j. 
+ Set to debug or trace if log4j initialization is failing. -->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="stdout" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %p %c: %m%n"/>
+            <Filters>
+                <ThresholdFilter level="FATAL"/>
+                <RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
+            </Filters>
+        </Console>
+        <File name="file" fileName="target/unit-tests.log">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n"/>
+            <Filters>
+                <RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
+            </Filters>
+        </File>
+    </Appenders>
+    <Loggers>
+        <Root level="INFO">
+            <AppenderRef ref="stdout"/>
+            <AppenderRef ref="file"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index c1b80974d..145813924 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -36,6 +36,7 @@
         <module>kyuubi-hive-it</module>
         <module>kyuubi-trino-it</module>
         <module>kyuubi-jdbc-it</module>
+        <module>kyuubi-zookeeper-it</module>
     </modules>
 
     <profiles>
diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml
index e7b89183f..3bee82803 100644
--- a/kyuubi-ha/pom.xml
+++ b/kyuubi-ha/pom.xml
@@ -126,5 +126,21 @@
     <build>
         <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
         <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>prepare-test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
     </build>
 </project>
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index c20799a1c..522487ef7 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -174,13 +174,13 @@ object DiscoveryClient {
   private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
     val maybeInfos = instance.split(";")
       .map(_.split("=", 2))
-      .filter(_.size == 2)
+      .filter(_.length == 2)
       .map(i => (i(0), i(1)))
       .toMap
-    if (maybeInfos.size > 0) {
+    if (maybeInfos.nonEmpty) {
       (
-        maybeInfos.get("hive.server2.thrift.bind.host").get,
-        maybeInfos.get("hive.server2.thrift.port").get.toInt)
+        maybeInfos("hive.server2.thrift.bind.host"),
+        maybeInfos("hive.server2.thrift.port").toInt)
     } else {
       val strings = instance.split(":")
       (strings(0), strings(1).toInt)
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
similarity index 54%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
index ca2a4ba88..5a1af35f6 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
@@ -18,16 +18,23 @@
 package org.apache.kyuubi.ha.client
 
 import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
 
-class DiscoveryClientProviderSuite extends KyuubiFunSuite {
-  test("discovery") {
-    val conf = KyuubiConf()
-    DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
-      discoveryClient.getServerHost("/kyuubi")
-    }
-    DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
-      discoveryClient.getServerHost("/kyuubi")
-    }
+trait DiscoveryClientSuite extends KyuubiFunSuite {
+
+  test("parse host and port from instance string") {
+    val host = "127.0.0.1"
+    val port = 10009
+    val instance1 = s"$host:$port"
+    val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
+    assert(host === host1)
+    assert(port === port1)
+
+    val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
+      s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
+      s"hive.server2.thrift.port=$port;" +
+      s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
+    val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
+    assert(host === host2)
+    assert(port === port2)
   }
 }
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
index 4236facd7..585b51bba 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -17,141 +17,137 @@
 
 package org.apache.kyuubi.ha.client
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import org.scalatest.time.SpanSugar._
 
-import org.apache.kyuubi.KYUUBI_VERSION
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_AUTH_TYPE
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-import org.apache.kyuubi.service.NoopTBinaryFrontendServer
-import org.apache.kyuubi.service.NoopTBinaryFrontendService
-import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.service.Service
-import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.service._
 
 trait DiscoveryClientTests extends KyuubiFunSuite {
-  protected val conf: KyuubiConf
+
+  protected def conf: KyuubiConf
 
   protected def getConnectString: String
 
-  test("publish instance to embedded zookeeper server") {
+  test("publish instance to discovery service") {
     val namespace = "kyuubiserver"
 
     conf
-      .unset(KyuubiConf.SERVER_KEYTAB)
-      .unset(KyuubiConf.SERVER_PRINCIPAL)
       .set(HA_ADDRESSES, getConnectString)
       .set(HA_NAMESPACE, namespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    var serviceDiscovery: KyuubiServiceDiscovery = null
-    val server: Serverable = new NoopTBinaryFrontendServer() {
+    var discovery: ServiceDiscovery = null
+    val service: Serverable = new NoopTBinaryFrontendServer() {
       override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
         new NoopTBinaryFrontendService(this) {
           override val discoveryService: Option[Service] = {
-            serviceDiscovery = new KyuubiServiceDiscovery(this)
-            Some(serviceDiscovery)
+            discovery = new KyuubiServiceDiscovery(this)
+            Some(discovery)
           }
         })
     }
-    server.initialize(conf)
-    server.start()
-    val znodeRoot = s"/$namespace"
-    withDiscoveryClient(conf) { framework =>
-      try {
-        assert(framework.pathNonExists("/abc"))
-        assert(framework.pathExists(znodeRoot))
-        val children = framework.getChildren(znodeRoot)
+    service.initialize(conf)
+    service.start()
+    val basePath = s"/$namespace"
+    try {
+      withDiscoveryClient(conf) { discoveryClient =>
+        assert(discoveryClient.pathNonExists("/abc"))
+        assert(discoveryClient.pathExists(basePath))
+        val children = discoveryClient.getChildren(basePath)
         assert(children.head ===
-          s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+          s"serviceUri=${service.frontendServices.head.connectionUrl};" +
           s"version=$KYUUBI_VERSION;sequence=0000000000")
 
         children.foreach { child =>
-          framework.delete(s"""$znodeRoot/$child""")
+          discoveryClient.delete(s"$basePath/$child")
         }
         eventually(timeout(5.seconds), interval(100.millis)) {
-          assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
-          assert(server.getServiceState === ServiceState.STOPPED)
+          assert(discovery.getServiceState === ServiceState.STOPPED)
+          assert(service.getServiceState === ServiceState.STOPPED)
         }
-      } finally {
-        server.stop()
       }
+    } finally {
+      service.stop()
+      discovery.stop()
     }
   }
 
-  test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
+  test("KYUUBI #304: Stop engine service gracefully when related node is deleted") {
     val logAppender = new LogAppender("test stop engine gracefully")
     withLogAppender(logAppender) {
       val namespace = "kyuubiengine"
 
       conf
-        .unset(KyuubiConf.SERVER_KEYTAB)
-        .unset(KyuubiConf.SERVER_PRINCIPAL)
         .set(HA_ADDRESSES, getConnectString)
         .set(HA_NAMESPACE, namespace)
         .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-        .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
 
-      var serviceDiscovery: KyuubiServiceDiscovery = null
-      val server: Serverable = new NoopTBinaryFrontendServer() {
+      var discovery: ServiceDiscovery = null
+      val service: Serverable = new NoopTBinaryFrontendServer() {
         override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
           new NoopTBinaryFrontendService(this) {
             override val discoveryService: Option[Service] = {
-              serviceDiscovery = new KyuubiServiceDiscovery(this)
-              Some(serviceDiscovery)
+              discovery = new KyuubiServiceDiscovery(this)
+              Some(discovery)
             }
           })
       }
-      server.initialize(conf)
-      server.start()
+      service.initialize(conf)
+      service.start()
 
-      val znodeRoot = s"/$namespace"
-      withDiscoveryClient(conf) { framework =>
-        try {
-
-          assert(framework.pathNonExists("/abc"))
-          assert(framework.pathExists(znodeRoot))
-          val children = framework.getChildren(znodeRoot)
+      val basePath = s"/$namespace"
+      try {
+        withDiscoveryClient(conf) { discoveryClient =>
+          assert(discoveryClient.pathNonExists("/abc"))
+          assert(discoveryClient.pathExists(basePath))
+          val children = discoveryClient.getChildren(basePath)
           assert(children.head ===
-            s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+            s"serviceUri=${service.frontendServices.head.connectionUrl};" +
             s"version=$KYUUBI_VERSION;sequence=0000000000")
 
           children.foreach { child =>
-            framework.delete(s"""$znodeRoot/$child""")
+            discoveryClient.delete(s"""$basePath/$child""")
           }
           eventually(timeout(5.seconds), interval(100.millis)) {
-            assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
-            assert(server.getServiceState === ServiceState.STOPPED)
-            val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
+            assert(discovery.getServiceState === ServiceState.STOPPED)
+            assert(service.getServiceState === ServiceState.STOPPED)
+            val msg = s"This Kyuubi instance ${service.frontendServices.head.connectionUrl}" +
               s" is now de-registered"
             assert(logAppender.loggingEvents.exists(
               _.getMessage.getFormattedMessage.contains(msg)))
           }
-        } finally {
-          server.stop()
-          serviceDiscovery.stop()
         }
+      } finally {
+        service.stop()
+        discovery.stop()
       }
     }
   }
 
-  test("parse host and port from instance string") {
-    val host = "127.0.0.1"
-    val port = 10009
-    val instance1 = s"$host:$port"
-    val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
-    assert(host === host1)
-    assert(port === port1)
-
-    val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
-      s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
-      s"hive.server2.thrift.port=$port;" +
-      s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
-    val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
-    assert(host === host2)
-    assert(port === port2)
+  test("distribute lock") {
+    val lockPath = "/lock-test"
+    val lockLatch = new CountDownLatch(1)
+
+    new Thread(() => {
+      withDiscoveryClient(conf) { discoveryClient =>
+        discoveryClient.tryWithLock(lockPath, 3000) {
+          lockLatch.countDown()
+          Thread.sleep(5000)
+        }
+      }
+    }).start()
+
+    withDiscoveryClient(conf) { discoveryClient =>
+      assert(lockLatch.await(5000, TimeUnit.MILLISECONDS))
+      val e = intercept[KyuubiSQLException] {
+        discoveryClient.tryWithLock(lockPath, 2000) {}
+      }
+      assert(e.getMessage contains s"Timeout to lock on path [$lockPath]")
+    }
   }
 }
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
index 5010d0b68..5b8855c1e 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import io.etcd.jetcd.launcher.{Etcd, EtcdCluster}
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_CLIENT_CLASS}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
 import org.apache.kyuubi.ha.client.DiscoveryClientTests
 import org.apache.kyuubi.service.NoopTBinaryFrontendServer
@@ -38,24 +38,26 @@ class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
 
   override def getConnectString: String = _connectString
 
-  val conf: KyuubiConf = {
-    KyuubiConf()
-      .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
-  }
+  var conf: KyuubiConf = KyuubiConf()
+    .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
 
   override def beforeAll(): Unit = {
     etcdCluster = new Etcd.Builder()
       .withNodes(2)
       .build()
     etcdCluster.start()
+    conf = new KyuubiConf()
+      .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+      .set(HA_ADDRESSES, getConnectString)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
+    super.afterAll()
     if (etcdCluster != null) {
       etcdCluster.close()
+      etcdCluster = null
     }
-    super.afterAll()
   }
 
   test("etcd test: set, get and delete") {
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index dd6394b5f..565a6bcd8 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -33,32 +33,48 @@ import org.scalatest.time.SpanSugar._
 import org.apache.kyuubi.KerberizedTestHelper
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClientTests
-import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client._
 import org.apache.kyuubi.service._
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
 
-class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with KerberizedTestHelper {
+class EmbeddedZookeeperDiscoveryClientSuite extends ZookeeperDiscoveryClientSuite {
 
-  val zkServer = new EmbeddedZookeeper()
-  override val conf: KyuubiConf = KyuubiConf()
+  private var _zkServer: EmbeddedZookeeper = _
 
-  override def getConnectString: String = zkServer.getConnectString
+  override def getConnectString: String = _zkServer.getConnectString
 
-  override def beforeAll(): Unit = {
-    conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
-    zkServer.initialize(conf)
-    zkServer.start()
-    super.beforeAll()
+  override def startZk(): Unit = {
+    val embeddedZkConf = KyuubiConf()
+    embeddedZkConf.set(ZK_CLIENT_PORT, 0)
+    _zkServer = new EmbeddedZookeeper()
+    _zkServer.initialize(embeddedZkConf)
+    _zkServer.start()
   }
 
-  override def afterAll(): Unit = {
-    conf.unset(KyuubiConf.SERVER_KEYTAB)
-    conf.unset(KyuubiConf.SERVER_PRINCIPAL)
-    conf.unset(HA_ADDRESSES)
-    zkServer.stop()
-    super.afterAll()
+  override def stopZk(): Unit = {
+    _zkServer.stop()
+  }
+}
+
+abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
+  with KerberizedTestHelper {
+
+  var conf: KyuubiConf = KyuubiConf()
+
+  def startZk(): Unit
+
+  def stopZk(): Unit
+
+  override def beforeEach(): Unit = {
+    startZk()
+    conf = new KyuubiConf().set(HA_ADDRESSES, getConnectString)
+    super.beforeEach()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    stopZk()
   }
 
   test("acl for zookeeper") {
@@ -116,43 +132,33 @@ class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with Kerberized
   }
 
   test("stop engine in time while zk ensemble terminates") {
-    val zkServer = new EmbeddedZookeeper()
-    val conf = KyuubiConf()
-      .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
-    try {
-      zkServer.initialize(conf)
-      zkServer.start()
-      var serviceDiscovery: EngineServiceDiscovery = null
-      val server = new NoopTBinaryFrontendServer() {
-        override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
-          new NoopTBinaryFrontendService(this) {
-            override val discoveryService: Option[Service] = {
-              serviceDiscovery = new EngineServiceDiscovery(this)
-              Some(serviceDiscovery)
-            }
-          })
-      }
-      conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
-        .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
-        .set(HA_ADDRESSES, zkServer.getConnectString)
-        .set(HA_ZK_SESSION_TIMEOUT, 2000)
-        .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-      server.initialize(conf)
-      server.start()
-      assert(server.getServiceState === ServiceState.STARTED)
-
-      zkServer.stop()
-      val isServerLostM = serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
-      isServerLostM.setAccessible(true)
-      val isServerLost = isServerLostM.get(serviceDiscovery)
-
-      eventually(timeout(10.seconds), interval(100.millis)) {
-        assert(isServerLost.asInstanceOf[AtomicBoolean].get())
-        assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
-        assert(server.getServiceState === ServiceState.STOPPED)
-      }
-    } finally {
-      zkServer.stop()
+    var discovery: ServiceDiscovery = null
+    val service = new NoopTBinaryFrontendServer() {
+      override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+        new NoopTBinaryFrontendService(this) {
+          override val discoveryService: Option[Service] = {
+            discovery = new EngineServiceDiscovery(this)
+            Some(discovery)
+          }
+        })
+    }
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+      .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+      .set(HA_ZK_SESSION_TIMEOUT, 2000)
+      .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+    service.initialize(conf)
+    service.start()
+    assert(service.getServiceState === ServiceState.STARTED)
+
+    stopZk()
+    val isServerLostM = discovery.getClass.getSuperclass.getDeclaredField("isServerLost")
+    isServerLostM.setAccessible(true)
+    val isServerLost = isServerLostM.get(discovery)
+
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      assert(isServerLost.asInstanceOf[AtomicBoolean].get())
+      assert(discovery.getServiceState === ServiceState.STOPPED)
+      assert(service.getServiceState === ServiceState.STOPPED)
     }
   }
 }