You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/13 01:32:25 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c24a192dc [KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test
c24a192dc is described below
commit c24a192dcb0b3c1490ee337e5f0bd37ee81f3679
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Sep 13 09:32:16 2022 +0800
[KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test
### _Why are the changes needed?_
This PR proposes to add Testcontainers based Zookeeper 3.4/3.5/3.6/3.7 integration test, we need it to verify the compatibility if we upgrade Zookeeper client to the new version. See details in #1941
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2040 from pan3793/zk-3-4-it.
Closes #2040
929540f6 [Cheng Pan] [TEST] Add Zookeeper 3.4/3.5/3.6 integration test
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.github/workflows/master.yml | 34 +++++
.../kyuubi-zookeeper-it}/pom.xml | 85 ++++---------
.../DockerizedZkServiceDiscoverySuite.scala | 46 +++++++
.../src/test/resources/log4j2-test.xml | 43 +++++++
integration-tests/pom.xml | 1 +
kyuubi-ha/pom.xml | 16 +++
.../apache/kyuubi/ha/client/DiscoveryClient.scala | 8 +-
...viderSuite.scala => DiscoveryClientSuite.scala} | 27 ++--
.../kyuubi/ha/client/DiscoveryClientTests.scala | 140 ++++++++++-----------
.../ha/client/etcd/EtcdDiscoveryClientSuite.scala | 14 ++-
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 118 ++++++++---------
11 files changed, 320 insertions(+), 212 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 0c815b2e8..bccfec36c 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -404,3 +404,37 @@ jobs:
path: |
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
+
+ zookeeper-it:
+ name: Zookeeper Integration Test
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ java: [ 8 ]
+ zookeeper: ["3.4", "3.5", "3.6", "3.7" ]
+ comment: [ "normal" ]
+ steps:
+ - uses: actions/checkout@v2
+ - name: Tune Runner VM
+ uses: ./.github/actions/tune-runner-vm
+ - name: Setup JDK ${{ matrix.java }}
+ uses: actions/setup-java@v2
+ with:
+ distribution: zulu
+ java-version: ${{ matrix.java }}
+ cache: 'maven'
+ check-latest: false
+ - name: zookeeper integration tests
+ run: |
+ export KYUUBI_IT_ZOOKEEPER_VERSION=${{ matrix.zookeeper }}
+ TEST_MODULES="integration-tests/kyuubi-zookeeper-it"
+ ./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -am clean install -DskipTests
+ ./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} test
+ - name: Upload test logs
+ if: failure()
+ uses: actions/upload-artifact@v2
+ with:
+ name: unit-tests-log-java-${{ matrix.java }}-zookeeper-${{ matrix.comment }}
+ path: |
+ **/target/unit-tests.log
diff --git a/kyuubi-ha/pom.xml b/integration-tests/kyuubi-zookeeper-it/pom.xml
similarity index 57%
copy from kyuubi-ha/pom.xml
copy to integration-tests/kyuubi-zookeeper-it/pom.xml
index e7b89183f..19882efdb 100644
--- a/kyuubi-ha/pom.xml
+++ b/integration-tests/kyuubi-zookeeper-it/pom.xml
@@ -7,7 +7,7 @@
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
- ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,84 +20,35 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>integration-tests</artifactId>
<groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>kyuubi-ha_2.12</artifactId>
- <name>Kyuubi Project High Availability</name>
- <packaging>jar</packaging>
+ <artifactId>kyuubi-zookeeper-it_2.12</artifactId>
+ <name>Kyuubi Test Zookeeper IT</name>
<url>https://kyuubi.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.etcd</groupId>
- <artifactId>jetcd-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-grpclb</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-netty</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-stub</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.etcd</groupId>
- <artifactId>jetcd-launcher</artifactId>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -110,15 +61,21 @@
</dependency>
<dependency>
- <groupId>com.unboundid</groupId>
- <artifactId>unboundid-ldapsdk</artifactId>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-zookeeper_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
+ <groupId>com.dimafeng</groupId>
+ <artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala b/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala
new file mode 100644
index 000000000..87eecf578
--- /dev/null
+++ b/integration-tests/kyuubi-zookeeper-it/src/test/java/org/apache/kyuubi/it/zookeeper/DockerizedZkServiceDiscoverySuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.zookeeper
+
+import com.dimafeng.testcontainers.{GenericContainer, SingleContainer}
+import org.testcontainers.containers.wait.strategy.Wait
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClientSuite
+
+class DockerizedZkServiceDiscoverySuite extends ZookeeperDiscoveryClientSuite {
+
+ private val zkClientPort = 2181
+ private val zkVersion = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_VERSION", "3.4")
+ private val zkImage = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_IMAGE", s"zookeeper:$zkVersion")
+
+ val container: SingleContainer[_] = GenericContainer(
+ dockerImage = zkImage,
+ exposedPorts = Seq(zkClientPort),
+ waitStrategy = Wait.forListeningPort)
+
+ override def getConnectString: String = s"${container.host}:${container.mappedPort(zkClientPort)}"
+
+ override def startZk(): Unit = synchronized {
+ container.start()
+ }
+
+ override def stopZk(): Unit = synchronized {
+ Utils.tryLogNonFatalError { container.stop() }
+ }
+}
diff --git a/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml b/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml
new file mode 100644
index 000000000..bfc40dd6d
--- /dev/null
+++ b/integration-tests/kyuubi-zookeeper-it/src/test/resources/log4j2-test.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!-- Extra logging related to initialization of Log4j.
+ Set to debug or trace if log4j initialization is failing. -->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="stdout" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %p %c: %m%n"/>
+ <Filters>
+ <ThresholdFilter level="FATAL"/>
+ <RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
+ </Filters>
+ </Console>
+ <File name="file" fileName="target/unit-tests.log">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n"/>
+ <Filters>
+ <RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
+ </Filters>
+ </File>
+ </Appenders>
+ <Loggers>
+ <Root level="INFO">
+ <AppenderRef ref="stdout"/>
+ <AppenderRef ref="file"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index c1b80974d..145813924 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -36,6 +36,7 @@
<module>kyuubi-hive-it</module>
<module>kyuubi-trino-it</module>
<module>kyuubi-jdbc-it</module>
+ <module>kyuubi-zookeeper-it</module>
</modules>
<profiles>
diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml
index e7b89183f..3bee82803 100644
--- a/kyuubi-ha/pom.xml
+++ b/kyuubi-ha/pom.xml
@@ -126,5 +126,21 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index c20799a1c..522487ef7 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -174,13 +174,13 @@ object DiscoveryClient {
private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
val maybeInfos = instance.split(";")
.map(_.split("=", 2))
- .filter(_.size == 2)
+ .filter(_.length == 2)
.map(i => (i(0), i(1)))
.toMap
- if (maybeInfos.size > 0) {
+ if (maybeInfos.nonEmpty) {
(
- maybeInfos.get("hive.server2.thrift.bind.host").get,
- maybeInfos.get("hive.server2.thrift.port").get.toInt)
+ maybeInfos("hive.server2.thrift.bind.host"),
+ maybeInfos("hive.server2.thrift.port").toInt)
} else {
val strings = instance.split(":")
(strings(0), strings(1).toInt)
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
similarity index 54%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
index ca2a4ba88..5a1af35f6 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientSuite.scala
@@ -18,16 +18,23 @@
package org.apache.kyuubi.ha.client
import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-class DiscoveryClientProviderSuite extends KyuubiFunSuite {
- test("discovery") {
- val conf = KyuubiConf()
- DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
- discoveryClient.getServerHost("/kyuubi")
- }
- DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
- discoveryClient.getServerHost("/kyuubi")
- }
+trait DiscoveryClientSuite extends KyuubiFunSuite {
+
+ test("parse host and port from instance string") {
+ val host = "127.0.0.1"
+ val port = 10009
+ val instance1 = s"$host:$port"
+ val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
+ assert(host === host1)
+ assert(port === port1)
+
+ val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
+ s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
+ s"hive.server2.thrift.port=$port;" +
+ s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
+ val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
+ assert(host === host2)
+ assert(port === port2)
}
}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
index 4236facd7..585b51bba 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -17,141 +17,137 @@
package org.apache.kyuubi.ha.client
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
import org.scalatest.time.SpanSugar._
-import org.apache.kyuubi.KYUUBI_VERSION
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_AUTH_TYPE
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-import org.apache.kyuubi.service.NoopTBinaryFrontendServer
-import org.apache.kyuubi.service.NoopTBinaryFrontendService
-import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.service.Service
-import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.service._
trait DiscoveryClientTests extends KyuubiFunSuite {
- protected val conf: KyuubiConf
+
+ protected def conf: KyuubiConf
protected def getConnectString: String
- test("publish instance to embedded zookeeper server") {
+ test("publish instance to discovery service") {
val namespace = "kyuubiserver"
conf
- .unset(KyuubiConf.SERVER_KEYTAB)
- .unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ADDRESSES, getConnectString)
.set(HA_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- var serviceDiscovery: KyuubiServiceDiscovery = null
- val server: Serverable = new NoopTBinaryFrontendServer() {
+ var discovery: ServiceDiscovery = null
+ val service: Serverable = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
- serviceDiscovery = new KyuubiServiceDiscovery(this)
- Some(serviceDiscovery)
+ discovery = new KyuubiServiceDiscovery(this)
+ Some(discovery)
}
})
}
- server.initialize(conf)
- server.start()
- val znodeRoot = s"/$namespace"
- withDiscoveryClient(conf) { framework =>
- try {
- assert(framework.pathNonExists("/abc"))
- assert(framework.pathExists(znodeRoot))
- val children = framework.getChildren(znodeRoot)
+ service.initialize(conf)
+ service.start()
+ val basePath = s"/$namespace"
+ try {
+ withDiscoveryClient(conf) { discoveryClient =>
+ assert(discoveryClient.pathNonExists("/abc"))
+ assert(discoveryClient.pathExists(basePath))
+ val children = discoveryClient.getChildren(basePath)
assert(children.head ===
- s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+ s"serviceUri=${service.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
- framework.delete(s"""$znodeRoot/$child""")
+ discoveryClient.delete(s"$basePath/$child")
}
eventually(timeout(5.seconds), interval(100.millis)) {
- assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
- assert(server.getServiceState === ServiceState.STOPPED)
+ assert(discovery.getServiceState === ServiceState.STOPPED)
+ assert(service.getServiceState === ServiceState.STOPPED)
}
- } finally {
- server.stop()
}
+ } finally {
+ service.stop()
+ discovery.stop()
}
}
- test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
+ test("KYUUBI #304: Stop engine service gracefully when related node is deleted") {
val logAppender = new LogAppender("test stop engine gracefully")
withLogAppender(logAppender) {
val namespace = "kyuubiengine"
conf
- .unset(KyuubiConf.SERVER_KEYTAB)
- .unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ADDRESSES, getConnectString)
.set(HA_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
- var serviceDiscovery: KyuubiServiceDiscovery = null
- val server: Serverable = new NoopTBinaryFrontendServer() {
+ var discovery: ServiceDiscovery = null
+ val service: Serverable = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
- serviceDiscovery = new KyuubiServiceDiscovery(this)
- Some(serviceDiscovery)
+ discovery = new KyuubiServiceDiscovery(this)
+ Some(discovery)
}
})
}
- server.initialize(conf)
- server.start()
+ service.initialize(conf)
+ service.start()
- val znodeRoot = s"/$namespace"
- withDiscoveryClient(conf) { framework =>
- try {
-
- assert(framework.pathNonExists("/abc"))
- assert(framework.pathExists(znodeRoot))
- val children = framework.getChildren(znodeRoot)
+ val basePath = s"/$namespace"
+ try {
+ withDiscoveryClient(conf) { discoveryClient =>
+ assert(discoveryClient.pathNonExists("/abc"))
+ assert(discoveryClient.pathExists(basePath))
+ val children = discoveryClient.getChildren(basePath)
assert(children.head ===
- s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+ s"serviceUri=${service.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
- framework.delete(s"""$znodeRoot/$child""")
+ discoveryClient.delete(s"""$basePath/$child""")
}
eventually(timeout(5.seconds), interval(100.millis)) {
- assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
- assert(server.getServiceState === ServiceState.STOPPED)
- val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
+ assert(discovery.getServiceState === ServiceState.STOPPED)
+ assert(service.getServiceState === ServiceState.STOPPED)
+ val msg = s"This Kyuubi instance ${service.frontendServices.head.connectionUrl}" +
s" is now de-registered"
assert(logAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(msg)))
}
- } finally {
- server.stop()
- serviceDiscovery.stop()
}
+ } finally {
+ service.stop()
+ discovery.stop()
}
}
}
- test("parse host and port from instance string") {
- val host = "127.0.0.1"
- val port = 10009
- val instance1 = s"$host:$port"
- val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
- assert(host === host1)
- assert(port === port1)
-
- val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
- s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
- s"hive.server2.thrift.port=$port;" +
- s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
- val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
- assert(host === host2)
- assert(port === port2)
+ test("distribute lock") {
+ val lockPath = "/lock-test"
+ val lockLatch = new CountDownLatch(1)
+
+ new Thread(() => {
+ withDiscoveryClient(conf) { discoveryClient =>
+ discoveryClient.tryWithLock(lockPath, 3000) {
+ lockLatch.countDown()
+ Thread.sleep(5000)
+ }
+ }
+ }).start()
+
+ withDiscoveryClient(conf) { discoveryClient =>
+ assert(lockLatch.await(5000, TimeUnit.MILLISECONDS))
+ val e = intercept[KyuubiSQLException] {
+ discoveryClient.tryWithLock(lockPath, 2000) {}
+ }
+ assert(e.getMessage contains s"Timeout to lock on path [$lockPath]")
+ }
}
}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
index 5010d0b68..5b8855c1e 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import io.etcd.jetcd.launcher.{Etcd, EtcdCluster}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_CLIENT_CLASS}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryClientTests
import org.apache.kyuubi.service.NoopTBinaryFrontendServer
@@ -38,24 +38,26 @@ class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
override def getConnectString: String = _connectString
- val conf: KyuubiConf = {
- KyuubiConf()
- .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
- }
+ var conf: KyuubiConf = KyuubiConf()
+ .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
override def beforeAll(): Unit = {
etcdCluster = new Etcd.Builder()
.withNodes(2)
.build()
etcdCluster.start()
+ conf = new KyuubiConf()
+ .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+ .set(HA_ADDRESSES, getConnectString)
super.beforeAll()
}
override def afterAll(): Unit = {
+ super.afterAll()
if (etcdCluster != null) {
etcdCluster.close()
+ etcdCluster = null
}
- super.afterAll()
}
test("etcd test: set, get and delete") {
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index dd6394b5f..565a6bcd8 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -33,32 +33,48 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KerberizedTestHelper
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClientTests
-import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client._
import org.apache.kyuubi.service._
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
-class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with KerberizedTestHelper {
+class EmbeddedZookeeperDiscoveryClientSuite extends ZookeeperDiscoveryClientSuite {
- val zkServer = new EmbeddedZookeeper()
- override val conf: KyuubiConf = KyuubiConf()
+ private var _zkServer: EmbeddedZookeeper = _
- override def getConnectString: String = zkServer.getConnectString
+ override def getConnectString: String = _zkServer.getConnectString
- override def beforeAll(): Unit = {
- conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
- zkServer.initialize(conf)
- zkServer.start()
- super.beforeAll()
+ override def startZk(): Unit = {
+ val embeddedZkConf = KyuubiConf()
+ embeddedZkConf.set(ZK_CLIENT_PORT, 0)
+ _zkServer = new EmbeddedZookeeper()
+ _zkServer.initialize(embeddedZkConf)
+ _zkServer.start()
}
- override def afterAll(): Unit = {
- conf.unset(KyuubiConf.SERVER_KEYTAB)
- conf.unset(KyuubiConf.SERVER_PRINCIPAL)
- conf.unset(HA_ADDRESSES)
- zkServer.stop()
- super.afterAll()
+ override def stopZk(): Unit = {
+ _zkServer.stop()
+ }
+}
+
+abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
+ with KerberizedTestHelper {
+
+ var conf: KyuubiConf = KyuubiConf()
+
+ def startZk(): Unit
+
+ def stopZk(): Unit
+
+ override def beforeEach(): Unit = {
+ startZk()
+ conf = new KyuubiConf().set(HA_ADDRESSES, getConnectString)
+ super.beforeEach()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ stopZk()
}
test("acl for zookeeper") {
@@ -116,43 +132,33 @@ class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with Kerberized
}
test("stop engine in time while zk ensemble terminates") {
- val zkServer = new EmbeddedZookeeper()
- val conf = KyuubiConf()
- .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
- try {
- zkServer.initialize(conf)
- zkServer.start()
- var serviceDiscovery: EngineServiceDiscovery = null
- val server = new NoopTBinaryFrontendServer() {
- override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
- new NoopTBinaryFrontendService(this) {
- override val discoveryService: Option[Service] = {
- serviceDiscovery = new EngineServiceDiscovery(this)
- Some(serviceDiscovery)
- }
- })
- }
- conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
- .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
- .set(HA_ADDRESSES, zkServer.getConnectString)
- .set(HA_ZK_SESSION_TIMEOUT, 2000)
- .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- server.initialize(conf)
- server.start()
- assert(server.getServiceState === ServiceState.STARTED)
-
- zkServer.stop()
- val isServerLostM = serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
- isServerLostM.setAccessible(true)
- val isServerLost = isServerLostM.get(serviceDiscovery)
-
- eventually(timeout(10.seconds), interval(100.millis)) {
- assert(isServerLost.asInstanceOf[AtomicBoolean].get())
- assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
- assert(server.getServiceState === ServiceState.STOPPED)
- }
- } finally {
- zkServer.stop()
+ var discovery: ServiceDiscovery = null
+ val service = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ discovery = new EngineServiceDiscovery(this)
+ Some(discovery)
+ }
+ })
+ }
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+ .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+ .set(HA_ZK_SESSION_TIMEOUT, 2000)
+ .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ service.initialize(conf)
+ service.start()
+ assert(service.getServiceState === ServiceState.STARTED)
+
+ stopZk()
+ val isServerLostM = discovery.getClass.getSuperclass.getDeclaredField("isServerLost")
+ isServerLostM.setAccessible(true)
+ val isServerLost = isServerLostM.get(discovery)
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(isServerLost.asInstanceOf[AtomicBoolean].get())
+ assert(discovery.getServiceState === ServiceState.STOPPED)
+ assert(service.getServiceState === ServiceState.STOPPED)
}
}
}