You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/07/12 03:04:43 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2644] Add etcd discovery client for HA
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 32970ce68 [KYUUBI #2644] Add etcd discovery client for HA
32970ce68 is described below
commit 32970ce685b10889f1951ce360f89e1b138d1148
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Tue Jul 12 11:04:35 2022 +0800
[KYUUBI #2644] Add etcd discovery client for HA
### _Why are the changes needed?_
Add etcd discovery client for HA
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2767 from hddong/etcd-support.
Closes #2644
6fa37578 [hongdongdong] Rebase and remove from rat-excludes
fc23a2b4 [hongdongdong] upgrade grpc-1.47.0
61c77d27 [hongdongdong] Fix tests
28abd13e [hongdongdong] fix comments
9e81a6a3 [hongdongdong] Rename HA_ZK_ENGINE_REF_ID to HA_ENGINE_REF_ID
aa2b4260 [hongdongdong] [KYUUBI #2644][WIP] Add etcd discovery client for HA
Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
LICENSE-binary | 3 +
dev/dependencyList | 32 +-
docs/deployment/settings.md | 1 +
externals/kyuubi-spark-sql-engine/pom.xml | 59 ++++
.../META-INF/services/io.grpc.LoadBalancerProvider | 20 ++
.../META-INF/services/io.grpc.NameResolverProvider | 21 ++
.../spark/EtcdShareLevelSparkEngineSuite.scala | 43 +++
...uite.scala => ShareLevelSparkEngineTests.scala} | 20 +-
.../engine/spark/WithDiscoverySparkSQLEngine.scala | 39 +--
...SQLEngine.scala => WithEmbeddedZookeeper.scala} | 36 +--
.../kyuubi/engine/spark/WithEtcdCluster.scala | 52 ++++
.../ZookeeperShareLevelSparkEngineSuite.scala | 43 +++
.../kyuubi/SparkSQLEngineDeregisterSuite.scala | 37 ++-
kyuubi-ha/pom.xml | 36 +++
.../apache/kyuubi/ha/HighAvailabilityConf.scala | 11 +-
.../ha/client/etcd/EtcdDiscoveryClient.scala | 340 +++++++++++++++++++++
.../ha/client/zookeeper/ZookeeperACLProvider.scala | 4 +-
.../client/zookeeper/ZookeeperClientProvider.scala | 4 +-
.../zookeeper/ZookeeperDiscoveryClient.scala | 4 +-
.../kyuubi/ha/client/DiscoveryClientTests.scala | 157 ++++++++++
.../ha/client/etcd/EtcdDiscoveryClientSuite.scala | 74 +++++
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 135 +-------
kyuubi-server/pom.xml | 16 +
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 4 +-
.../{EngineRefSuite.scala => EngineRefTests.scala} | 78 +----
.../kyuubi/engine/EngineRefWithEtcdSuite.scala | 54 ++++
.../engine/EngineRefWithZookeeperSuite.scala | 104 +++++++
pom.xml | 50 +++
28 files changed, 1176 insertions(+), 301 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 2860e0640..74f18b45a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -263,6 +263,9 @@ io.dropwizard.metrics:metrics-jmx
io.dropwizard.metrics:metrics-json
io.dropwizard.metrics:metrics-jvm
io.fabric8:kubernetes-client
+io.etcd:jetcd-core
+io.grpc:grpc-core
+io.grpc:grpc-protobuf
io.netty:netty-all
io.netty:netty-buffer
io.netty:netty-codec
diff --git a/dev/dependencyList b/dev/dependencyList
index 7f3f6a71a..5abe9992d 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -16,6 +16,8 @@
#
HikariCP/4.0.3//HikariCP-4.0.3.jar
+animal-sniffer-annotations/1.19//animal-sniffer-annotations-1.19.jar
+annotations/4.1.1.4//annotations-4.1.1.4.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
automaton/1.11-8//automaton-1.11-8.jar
classgraph/4.8.138//classgraph-4.8.138.jar
@@ -28,8 +30,19 @@ curator-client/2.12.0//curator-client-2.12.0.jar
curator-framework/2.12.0//curator-framework-2.12.0.jar
curator-recipes/2.12.0//curator-recipes-2.12.0.jar
derby/10.14.2.0//derby-10.14.2.0.jar
+error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
+failsafe/2.4.4//failsafe-2.4.4.jar
fliptables/1.0.2//fliptables-1.0.2.jar
generex/1.0.2//generex-1.0.2.jar
+grpc-api/1.47.0//grpc-api-1.47.0.jar
+grpc-context/1.47.0//grpc-context-1.47.0.jar
+grpc-core/1.47.0//grpc-core-1.47.0.jar
+grpc-grpclb/1.47.0//grpc-grpclb-1.47.0.jar
+grpc-netty/1.47.0//grpc-netty-1.47.0.jar
+grpc-protobuf-lite/1.47.0//grpc-protobuf-lite-1.47.0.jar
+grpc-protobuf/1.47.0//grpc-protobuf-1.47.0.jar
+grpc-stub/1.47.0//grpc-stub-1.47.0.jar
+gson/2.8.9//gson-2.8.9.jar
guava/30.1-jre//guava-30.1-jre.jar
hadoop-client-api/3.3.1//hadoop-client-api-3.3.1.jar
hadoop-client-runtime/3.3.1//hadoop-client-runtime-3.3.1.jar
@@ -47,6 +60,7 @@ hk2-utils/2.6.1//hk2-utils-2.6.1.jar
htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
httpclient/4.5.13//httpclient-4.5.13.jar
httpcore/4.4.15//httpcore-4.4.15.jar
+j2objc-annotations/1.3//j2objc-annotations-1.3.jar
jackson-annotations/2.13.3//jackson-annotations-2.13.3.jar
jackson-core/2.13.3//jackson-core-2.13.3.jar
jackson-databind/2.13.3//jackson-databind-2.13.3.jar
@@ -72,6 +86,10 @@ jersey-entity-filtering/2.36//jersey-entity-filtering-2.36.jar
jersey-hk2/2.36//jersey-hk2-2.36.jar
jersey-media-json-jackson/2.36//jersey-media-json-jackson-2.36.jar
jersey-server/2.36//jersey-server-2.36.jar
+jetcd-api/0.7.1//jetcd-api-0.7.1.jar
+jetcd-common/0.7.1//jetcd-common-0.7.1.jar
+jetcd-core/0.7.1//jetcd-core-0.7.1.jar
+jetcd-grpc/0.7.1//jetcd-grpc-0.7.1.jar
jetty-http/9.4.48.v20220622//jetty-http-9.4.48.v20220622.jar
jetty-io/9.4.48.v20220622//jetty-io-9.4.48.v20220622.jar
jetty-security/9.4.48.v20220622//jetty-security-9.4.48.v20220622.jar
@@ -115,9 +133,15 @@ metrics-json/4.2.8//metrics-json-4.2.8.jar
metrics-jvm/4.2.8//metrics-jvm-4.2.8.jar
netty-all/4.1.73.Final//netty-all-4.1.73.Final.jar
netty-buffer/4.1.73.Final//netty-buffer-4.1.73.Final.jar
+netty-codec-dns/4.1.74.Final//netty-codec-dns-4.1.74.Final.jar
+netty-codec-http/4.1.72.Final//netty-codec-http-4.1.72.Final.jar
+netty-codec-http2/4.1.72.Final//netty-codec-http2-4.1.72.Final.jar
+netty-codec-socks/4.1.72.Final//netty-codec-socks-4.1.72.Final.jar
netty-codec/4.1.73.Final//netty-codec-4.1.73.Final.jar
netty-common/4.1.73.Final//netty-common-4.1.73.Final.jar
+netty-handler-proxy/4.1.72.Final//netty-handler-proxy-4.1.72.Final.jar
netty-handler/4.1.73.Final//netty-handler-4.1.73.Final.jar
+netty-resolver-dns/4.1.74.Final//netty-resolver-dns-4.1.74.Final.jar
netty-resolver/4.1.73.Final//netty-resolver-4.1.73.Final.jar
netty-tcnative-classes/2.0.46.Final//netty-tcnative-classes-2.0.46.Final.jar
netty-transport-classes-epoll/4.1.73.Final//netty-transport-classes-epoll-4.1.73.Final.jar
@@ -126,12 +150,16 @@ netty-transport-native-epoll/4.1.73.Final/linux-aarch_64/netty-transport-native-
netty-transport-native-epoll/4.1.73.Final/linux-x86_64/netty-transport-native-epoll-4.1.73.Final-linux-x86_64.jar
netty-transport-native-kqueue/4.1.73.Final/osx-aarch_64/netty-transport-native-kqueue-4.1.73.Final-osx-aarch_64.jar
netty-transport-native-kqueue/4.1.73.Final/osx-x86_64/netty-transport-native-kqueue-4.1.73.Final-osx-x86_64.jar
-netty-transport-native-unix-common/4.1.73.Final//netty-transport-native-unix-common-4.1.73.Final.jar
+netty-transport-native-unix-common/4.1.72.Final//netty-transport-native-unix-common-4.1.72.Final.jar
netty-transport/4.1.73.Final//netty-transport-4.1.73.Final.jar
okhttp/3.12.12//okhttp-3.12.12.jar
okio/1.15.0//okio-1.15.0.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
+perfmark-api/0.25.0//perfmark-api-0.25.0.jar
+proto-google-common-protos/2.0.1//proto-google-common-protos-2.0.1.jar
+protobuf-java-util/3.19.2//protobuf-java-util-3.19.2.jar
+protobuf-java/3.19.2//protobuf-java-3.19.2.jar
scala-library/2.12.15//scala-library-2.12.15.jar
scopt_2.12/4.1.0//scopt_2.12-4.1.0.jar
simpleclient/0.14.1//simpleclient-0.14.1.jar
@@ -149,5 +177,7 @@ swagger-core/2.2.1//swagger-core-2.2.1.jar
swagger-integration/2.2.1//swagger-integration-2.2.1.jar
swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
swagger-models/2.2.1//swagger-models-2.2.1.jar
+vertx-core/4.2.7//vertx-core-4.2.7.jar
+vertx-grpc/4.2.7//vertx-grpc-4.2.7.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper/3.4.14//zookeeper-3.4.14.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 78e4599b3..4d8d9fa6c 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -310,6 +310,7 @@ Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.ha.addresses||The connection string for the discovery ensemble|string|1.6.0
kyuubi.ha.client.class|org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient|Class name for service discovery client.|string|1.6.0
+kyuubi.ha.etcd.lease.timeout|PT10S|Timeout for etcd keep alive lease. The kyuubi server will known unexpected loss of engine after up to this seconds.|duration|1.6.0
kyuubi.ha.namespace|kyuubi|The root directory for the service to deploy its instance uri|string|1.6.0
kyuubi.ha.zookeeper.acl.enabled|false|Set to true if the zookeeper ensemble is kerberized|boolean|1.0.0
kyuubi.ha.zookeeper.auth.digest|<undefined>|The digest auth string is used for zookeeper authentication, like: username:password.|string|1.3.2
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index e247367bb..56d8ceca9 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -51,6 +51,16 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
@@ -179,6 +189,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -192,6 +208,14 @@
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
+ <include>com.google.guava:*</include>
+ <include>com.google.protobuf:*</include>
+ <include>io.etcd:*</include>
+ <include>io.grpc:*</include>
+ <include>io.netty:*</include>
+ <include>io.perfmark:perfmark-api</include>
+ <include>io.vertx:*</include>
+ <include>net.jodah:failsafe</include>
<include>org.apache.kyuubi:kyuubi-common_${scala.binary.version}</include>
<include>org.apache.kyuubi:kyuubi-events_${scala.binary.version}</include>
<include>org.apache.kyuubi:kyuubi-ha_${scala.binary.version}</include>
@@ -249,6 +273,41 @@
<include>org.apache.thrift.**</include>
</includes>
</relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.com.google</shadedPattern>
+ <includes>
+ <include>com.google.**</include>
+ </includes>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.io.netty</shadedPattern>
+ <includes>
+ <include>io.netty.**</include>
+ </includes>
+ </relocation>
+ <relocation>
+ <pattern>io.vertx</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.io.vertx</shadedPattern>
+ <includes>
+ <include>io.vertx.**</include>
+ </includes>
+ </relocation>
+ <relocation>
+ <pattern>net.jodah</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.net.jodah</shadedPattern>
+ <includes>
+ <include>net.jodah.**</include>
+ </includes>
+ </relocation>
+ <relocation>
+ <pattern>io.perfmark</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.io.perfmark</shadedPattern>
+ <includes>
+ <include>io.perfmark.**</include>
+ </includes>
+ </relocation>
</relocations>
</configuration>
<executions>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
new file mode 100644
index 000000000..13c372b25
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.grpc.internal.PickFirstLoadBalancerProvider
+io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider
+io.grpc.grpclb.GrpclbLoadBalancerProvider
\ No newline at end of file
diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider
new file mode 100644
index 000000000..46b5718ff
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/io.grpc.NameResolverProvider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.etcd.jetcd.resolver.DnsSrvResolverProvider
+io.etcd.jetcd.resolver.HttpResolverProvider
+io.etcd.jetcd.resolver.HttpsResolverProvider
+io.etcd.jetcd.resolver.IPResolverProvider
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
new file mode 100644
index 000000000..46dc3b54c
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
+
+trait EtcdShareLevelSparkEngineSuite
+ extends ShareLevelSparkEngineTests with WithEtcdCluster {
+ override def withKyuubiConf: Map[String, String] = {
+ super.withKyuubiConf ++
+ etcdConf ++ Map(
+ ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+ ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+ ENGINE_CHECK_INTERVAL.key -> "PT5s")
+ }
+}
+
+class EtcdConnectionLevelSparkEngineSuite extends EtcdShareLevelSparkEngineSuite {
+ override def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class EtcdUserLevelSparkEngineSuite extends EtcdShareLevelSparkEngineSuite {
+ override def shareLevel: ShareLevel = ShareLevel.USER
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
similarity index 80%
rename from externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
rename to externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
index b75800d5f..c83139592 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala
@@ -22,9 +22,6 @@ import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -34,15 +31,10 @@ import org.apache.kyuubi.service.ServiceState
* This suite is to test some behavior with spark engine in different share level.
* e.g. cleanup discovery service before stop.
*/
-abstract class ShareLevelSparkEngineSuite
+trait ShareLevelSparkEngineTests
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel
- override def withKyuubiConf: Map[String, String] = {
- super.withKyuubiConf ++ Map(
- ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
- ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
- ENGINE_CHECK_INTERVAL.key -> "PT5s")
- }
+
override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
@@ -85,11 +77,3 @@ abstract class ShareLevelSparkEngineSuite
}
}
}
-
-class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
- override def shareLevel: ShareLevel = ShareLevel.CONNECTION
-}
-
-class UserLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
- override def shareLevel: ShareLevel = ShareLevel.USER
-}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
index 4a976c89d..7f2f06517 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
@@ -17,39 +17,16 @@
package org.apache.kyuubi.engine.spark
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE, HA_ZK_AUTH_TYPE}
-import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
- private var zkServer: EmbeddedZookeeper = _
- def namespace: String
- override def withKyuubiConf: Map[String, String] = {
- assert(zkServer != null)
- Map(
- HA_ADDRESSES.key -> zkServer.getConnectString,
- HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
- HA_NAMESPACE.key -> namespace)
- }
- override def beforeAll(): Unit = {
- zkServer = new EmbeddedZookeeper()
- val zkData = Utils.createTempDir()
- val tmpConf = KyuubiConf()
- tmpConf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
- tmpConf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
- zkServer.initialize(tmpConf)
- zkServer.start()
- }
+ def namespace: String
- override def afterAll(): Unit = {
- if (zkServer != null) {
- zkServer.stop()
- }
+ override def withKyuubiConf: Map[String, String] = {
+ Map(HA_NAMESPACE.key -> namespace)
}
override protected def beforeEach(): Unit = {
@@ -65,12 +42,4 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
}
-
- protected def getDiscoveryConnectionString: String = {
- if (zkServer == null) {
- ""
- } else {
- zkServer.getConnectString
- }
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
similarity index 66%
copy from externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
copy to externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
index 4a976c89d..7f2d6b7a7 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEmbeddedZookeeper.scala
@@ -17,23 +17,21 @@
package org.apache.kyuubi.engine.spark
+import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE, HA_ZK_AUTH_TYPE}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_ZK_AUTH_TYPE}
import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
+trait WithEmbeddedZookeeper extends KyuubiFunSuite {
private var zkServer: EmbeddedZookeeper = _
- def namespace: String
- override def withKyuubiConf: Map[String, String] = {
+
+ lazy val zookeeperConf: Map[String, String] = {
assert(zkServer != null)
Map(
HA_ADDRESSES.key -> zkServer.getConnectString,
- HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
- HA_NAMESPACE.key -> namespace)
+ HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString)
}
override def beforeAll(): Unit = {
@@ -51,26 +49,4 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
zkServer.stop()
}
}
-
- override protected def beforeEach(): Unit = {
- super.beforeEach()
- startSparkEngine()
- }
-
- override protected def afterEach(): Unit = {
- super.afterEach()
- stopSparkEngine()
- }
-
- def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
- DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
- }
-
- protected def getDiscoveryConnectionString: String = {
- if (zkServer == null) {
- ""
- } else {
- zkServer.getConnectString
- }
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala
new file mode 100644
index 000000000..690f6c9cc
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithEtcdCluster.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient
+
+trait WithEtcdCluster extends KyuubiFunSuite {
+
+ private var etcdCluster: EtcdCluster = _
+
+ lazy val etcdConf: Map[String, String] = {
+ Map(
+ HA_ADDRESSES.key -> etcdCluster.clientEndpoints().asScala.mkString(","),
+ HA_CLIENT_CLASS.key -> classOf[EtcdDiscoveryClient].getName)
+ }
+
+ override def beforeAll(): Unit = {
+ etcdCluster = new Etcd.Builder().withNodes(1).build()
+ etcdCluster.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ if (etcdCluster != null) {
+ etcdCluster.close()
+ }
+ super.afterAll()
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
new file mode 100644
index 000000000..4ef96e61a
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
+
+trait ZookeeperShareLevelSparkEngineSuite
+ extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
+ override def withKyuubiConf: Map[String, String] = {
+ super.withKyuubiConf ++
+ zookeeperConf ++ Map(
+ ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
+ ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+ ENGINE_CHECK_INTERVAL.key -> "PT5s")
+ }
+}
+
+class ConnectionLevelSparkEngineSuite extends ZookeeperShareLevelSparkEngineSuite {
+ override def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class UserLevelSparkEngineSuite extends ZookeeperShareLevelSparkEngineSuite {
+ override def shareLevel: ShareLevel = ShareLevel.USER
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
index cd13a4b6d..8dc93759b 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
@@ -26,15 +26,18 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.sparkMajorMinorVersion
import org.apache.kyuubi.engine.spark.WithDiscoverySparkSQLEngine
+import org.apache.kyuubi.engine.spark.WithEmbeddedZookeeper
import org.apache.kyuubi.service.ServiceState
-abstract class SparkSQLEngineDeregisterSuite extends WithDiscoverySparkSQLEngine {
+abstract class SparkSQLEngineDeregisterSuite
+ extends WithDiscoverySparkSQLEngine with WithEmbeddedZookeeper {
protected val maxJobFailures: Int = 2
override def withKyuubiConf: Map[String, String] = {
- super.withKyuubiConf ++ Map(
- ANSI_ENABLED.key -> "true",
- ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString)
+ super.withKyuubiConf ++
+ zookeeperConf ++ Map(
+ ANSI_ENABLED.key -> "true",
+ ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString)
}
override val namespace: String = s"/kyuubi/deregister_test/${UUID.randomUUID().toString}"
@@ -81,22 +84,24 @@ class SparkSQLEngineDeregisterMsgSuite extends SparkSQLEngineDeregisterSuite {
}
}
-class SparkSQLEngineDeregisterExceptionTTLSuite extends WithDiscoverySparkSQLEngine {
+class SparkSQLEngineDeregisterExceptionTTLSuite
+ extends WithDiscoverySparkSQLEngine with WithEmbeddedZookeeper {
protected val maxJobFailures: Int = 2
protected val deregisterExceptionTTL = 2000
override def withKyuubiConf: Map[String, String] = {
- super.withKyuubiConf ++ Map(
- ANSI_ENABLED.key -> "true",
- ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
- sparkMajorMinorVersion match {
- // see https://issues.apache.org/jira/browse/SPARK-35958
- case (3, minor) if minor > 2 => "org.apache.spark.SparkArithmeticException"
- case _ => classOf[ArithmeticException].getCanonicalName
- }
- },
- ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString,
- ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString)
+ super.withKyuubiConf ++
+ zookeeperConf ++ Map(
+ ANSI_ENABLED.key -> "true",
+ ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
+ sparkMajorMinorVersion match {
+ // see https://issues.apache.org/jira/browse/SPARK-35958
+ case (3, minor) if minor > 2 => "org.apache.spark.SparkArithmeticException"
+ case _ => classOf[ArithmeticException].getCanonicalName
+ }
+ },
+ ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString,
+ ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString)
}
override val namespace: String = s"/kyuubi/deregister_test/${UUID.randomUUID().toString}"
diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml
index 79750932b..4d4ea51aa 100644
--- a/kyuubi-ha/pom.xml
+++ b/kyuubi-ha/pom.xml
@@ -59,6 +59,42 @@
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index c47a8ffd4..f38a45ce6 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -155,7 +155,7 @@ object HighAvailabilityConf {
.checkValue(_ > 0, "Must be positive")
.createWithDefault(Duration.ofSeconds(120).toMillis)
- val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
+ val HA_ENGINE_REF_ID: OptionalConfigEntry[String] =
buildConf("kyuubi.ha.engine.ref.id")
.doc("The engine reference id will be attached to zookeeper node when engine started, " +
"and the kyuubi server will check it cyclically.")
@@ -180,4 +180,13 @@ object HighAvailabilityConf {
.version("1.5.0")
.stringConf
.createOptional
+
+ val HA_ETCD_LEASE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.ha.etcd.lease.timeout")
+ .doc("Timeout for etcd keep alive lease. The kyuubi server will known " +
+ "unexpected loss of engine after up to this seconds.")
+ .version("1.6.0")
+ .timeConf
+ .checkValue(_ > 0, "Must be positive")
+ .createWithDefault(Duration.ofSeconds(10).toMillis)
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
new file mode 100644
index 000000000..c9245a88e
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.etcd
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.TimeoutException
+
+import com.google.common.annotations.VisibleForTesting
+import io.etcd.jetcd.ByteSequence
+import io.etcd.jetcd.Client
+import io.etcd.jetcd.KV
+import io.etcd.jetcd.Lease
+import io.etcd.jetcd.Lock
+import io.etcd.jetcd.Watch
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse
+import io.etcd.jetcd.options.DeleteOption
+import io.etcd.jetcd.options.GetOption
+import io.etcd.jetcd.options.PutOption
+import io.etcd.jetcd.watch.WatchEvent
+import io.etcd.jetcd.watch.WatchResponse
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ENGINE_REF_ID
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._
+
+class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
+
+ case class ServiceNode(path: String, lease: Long)
+
+ var client: Client = _
+ var kvClient: KV = _
+ var lockClient: Lock = _
+ var leaseClient: Lease = _
+ var serviceNode: ServiceNode = _
+
+ var leaseTTL: Long = _
+
+ def createClient(): Unit = {
+ val endpoints = conf.get(HighAvailabilityConf.HA_ADDRESSES)
+ client = Client.builder.endpoints(endpoints).build
+ kvClient = client.getKVClient()
+ lockClient = client.getLockClient()
+ leaseClient = client.getLeaseClient()
+
+ leaseTTL = conf.get(HighAvailabilityConf.HA_ETCD_LEASE_TIMEOUT) / 1000
+ }
+
+ def closeClient(): Unit = {
+ if (client != null) {
+ client.close()
+ }
+ }
+
+ def create(path: String, mode: String, createParent: Boolean = true): String = {
+ // createParent can not effect here
+ mode match {
+ case "PERSISTENT" => kvClient.put(
+ ByteSequence.from(path.getBytes()),
+ ByteSequence.from(path.getBytes())).get()
+ case m => throw new KyuubiException(s"Create mode $m is not support in etcd!")
+ }
+ path
+ }
+
+ def getData(path: String): Array[Byte] = {
+ val response = kvClient.get(ByteSequence.from(path.getBytes())).get()
+ if (response.getKvs.isEmpty) {
+ throw new KyuubiException(s"Key[$path] not exists in ETCD, please check it.")
+ } else {
+ response.getKvs.get(0).getValue.getBytes
+ }
+ }
+
+ def getChildren(path: String): List[String] = {
+ val kvs = kvClient.get(
+ ByteSequence.from(path.getBytes()),
+ GetOption.newBuilder().isPrefix(true).build()).get().getKvs
+ if (kvs.isEmpty) {
+ List.empty
+ } else {
+ kvs.asScala.map(kv => kv.getKey.toString(UTF_8).stripPrefix(path).stripPrefix("/"))
+ .filter(key => key.nonEmpty && !key.startsWith("lock")).toList
+ }
+ }
+
+ def pathExists(path: String): Boolean = {
+ !pathNonExists(path)
+ }
+
+ def pathNonExists(path: String): Boolean = {
+ kvClient.get(ByteSequence.from(path.getBytes())).get().getKvs.isEmpty
+ }
+
+ def delete(path: String, deleteChildren: Boolean = false): Unit = {
+ kvClient.delete(
+ ByteSequence.from(path.getBytes()),
+ DeleteOption.newBuilder().isPrefix(deleteChildren).build()).get()
+ }
+
+ def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+ // not need with etcd
+ }
+
+ def tryWithLock[T](
+ lockPath: String,
+ timeout: Long)(f: => T): T = {
+ // the default unit is millis, covert to seconds.
+ // add more 3 second for leaseTime to make client fast fail
+ val leaseTime = timeout / 1000 + 3
+ // if the lease expires, the lock is automatically released.
+ val leaseId = leaseClient.grant(leaseTime).get().getID()
+ try {
+ try {
+ // Acquire a lease. If no leases are available, this method blocks until either the
+ // maximum number of leases is increased or another client/process closes a lease
+
+ // will throw TimeoutException when we are get lock timeout
+ lockClient.lock(ByteSequence.from(lockPath.getBytes()), leaseId)
+ .get(timeout, TimeUnit.MILLISECONDS)
+ } catch {
+ case _: TimeoutException =>
+ throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after " +
+ s"$timeout ms. There would be some problem that other session may " +
+ s"create engine timeout.")
+ case e: Exception =>
+ throw new KyuubiException(s"Lock failed on path [$lockPath]", e)
+ }
+ f
+ } finally {
+ try {
+ lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get()
+ leaseClient.revoke(leaseId).get()
+ } catch {
+ case e: Exception => throw new KyuubiException(e.getMessage, e.getCause)
+ }
+ }
+ }
+
+ def getServerHost(namespace: String): Option[(String, Int)] = {
+ // TODO: use last one because to avoid touching some maybe-crashed engines
+ // We need a big improvement here.
+ getServiceNodesInfo(namespace, Some(1), silent = true) match {
+ case Seq(sn) => Some((sn.host, sn.port))
+ case _ => None
+ }
+ }
+
+ def getEngineByRefId(
+ namespace: String,
+ engineRefId: String): Option[(String, Int)] = {
+ getServiceNodesInfo(namespace, silent = true)
+ .find(_.engineRefId.exists(_.equals(engineRefId)))
+ .map(data => (data.host, data.port))
+ }
+
+ def getServiceNodesInfo(
+ namespace: String,
+ sizeOpt: Option[Int] = None,
+ silent: Boolean = false): Seq[ServiceNodeInfo] = {
+ try {
+ val hosts = getChildren(DiscoveryPaths.makePath(null, namespace))
+ val size = sizeOpt.getOrElse(hosts.size)
+ hosts.takeRight(size).map { p =>
+ val path = DiscoveryPaths.makePath(namespace, p)
+ val instance = new String(getData(path), UTF_8)
+ val (host, port) = DiscoveryClient.parseInstanceHostPort(instance)
+ val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
+ val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
+ info(s"Get service instance:$instance and version:$version under $namespace")
+ ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
+ }
+ } catch {
+ case _: Exception if silent => Nil
+ case e: Exception =>
+ error(s"Failed to get service node info", e)
+ Nil
+ }
+ }
+
+ def registerService(
+ conf: KyuubiConf,
+ namespace: String,
+ serviceDiscovery: ServiceDiscovery,
+ version: Option[String] = None,
+ external: Boolean = false): Unit = {
+ val instance = serviceDiscovery.fe.connectionUrl
+ val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+
+ val serviceNode = createPersistentNode(conf, namespace, instance, version, external)
+
+ client.getWatchClient.watch(ByteSequence.from(serviceNode.path.getBytes()), watcher)
+
+ if (pathNonExists(serviceNode.path)) {
+ // No node exists, throw exception
+ throw new KyuubiException(s"Unable to create keyValue for this Kyuubi " +
+ s"instance[${instance}] on ETCD.")
+ }
+ }
+
+ def deregisterService(): Unit = {
+ // close the EPHEMERAL_SEQUENTIAL node in etcd
+ if (serviceNode != null) {
+ if (serviceNode.lease != LEASE_NULL_VALUE) {
+ client.getLeaseClient.revoke(serviceNode.lease)
+ delete(serviceNode.path)
+ }
+ serviceNode = null
+ }
+ }
+
+ def postDeregisterService(namespace: String): Boolean = {
+ if (namespace != null) {
+ delete(DiscoveryPaths.makePath(null, namespace), true)
+ true
+ } else {
+ false
+ }
+ }
+
+ def createAndGetServiceNode(
+ conf: KyuubiConf,
+ namespace: String,
+ instance: String,
+ version: Option[String] = None,
+ external: Boolean = false): String = {
+ createPersistentNode(conf, namespace, instance, version, external).path
+ }
+
+ @VisibleForTesting
+ def startSecretNode(
+ createMode: String,
+ basePath: String,
+ initData: String,
+ useProtection: Boolean = false): Unit = {
+ client.getKVClient.put(
+ ByteSequence.from(basePath.getBytes()),
+ ByteSequence.from(initData.getBytes())).get()
+ }
+
+ private def createPersistentNode(
+ conf: KyuubiConf,
+ namespace: String,
+ instance: String,
+ version: Option[String] = None,
+ external: Boolean = false): ServiceNode = {
+ val ns = DiscoveryPaths.makePath(null, namespace)
+ create(ns, "PERSISTENT")
+
+ val session = conf.get(HA_ENGINE_REF_ID)
+ .map(refId => s"refId=$refId;").getOrElse("")
+ val pathPrefix = DiscoveryPaths.makePath(
+ namespace,
+ s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
+ val znode = instance
+
+ var leaseId: Long = LEASE_NULL_VALUE
+ var realPath: String = null
+ // Use the same of engine init timeout
+ val timeout = conf.get(ENGINE_INIT_TIMEOUT)
+ // lock to get instance sequence
+ tryWithLock(s"$ns$LOCK_PATH_SUFFIX", timeout) {
+ val instances = getChildren(pathPrefix).map(_.stripPrefix(pathPrefix).toLong)
+ val sequence: Long = if (instances.isEmpty) 0 else instances.max + 1
+ realPath = s"$pathPrefix${"%010d".format(sequence)}"
+
+ if (external) {
+ client.getKVClient.put(
+ ByteSequence.from(realPath.getBytes()),
+ ByteSequence.from(znode.getBytes())).get()
+ } else {
+ leaseId = client.getLeaseClient.grant(leaseTTL).get().getID
+ client.getLeaseClient.keepAlive(
+ leaseId,
+ new StreamObserver[LeaseKeepAliveResponse] {
+ override def onNext(v: LeaseKeepAliveResponse): Unit = Unit // do nothing
+
+ override def onError(throwable: Throwable): Unit = Unit // do nothing
+
+ override def onCompleted(): Unit = Unit // do nothing
+ })
+ client.getKVClient.put(
+ ByteSequence.from(realPath.getBytes()),
+ ByteSequence.from(znode.getBytes()),
+ PutOption.newBuilder().withLeaseId(leaseId).build()).get()
+ }
+ }
+ ServiceNode(realPath, leaseId)
+ }
+
+ class DeRegisterWatcher(instance: String, serviceDiscovery: ServiceDiscovery)
+ extends Watch.Listener {
+
+ override def onNext(watchResponse: WatchResponse): Unit = {
+ watchResponse.getEvents.asScala
+ .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => {
+ warn(s"This Kyuubi instance ${instance} is now de-registered from" +
+ s" ETCD. The server will be shut down after the last client session completes.")
+ serviceDiscovery.stopGracefully()
+ })
+ }
+
+ override def onError(throwable: Throwable): Unit =
+ throw new KyuubiException(throwable.getMessage, throwable.getCause)
+
+ override def onCompleted(): Unit = Unit
+ }
+}
+
+object EtcdDiscoveryClient {
+ final private val LEASE_NULL_VALUE: Long = -1
+ final private[etcd] val LOCK_PATH_SUFFIX = "/lock"
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
index 40ead4633..467c323b7 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
@@ -42,9 +42,9 @@ class ZookeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
}
- if (conf.get(HighAvailabilityConf.HA_ZK_ENGINE_REF_ID).isEmpty && enabledServerAcls()) {
+ if (conf.get(HighAvailabilityConf.HA_ENGINE_REF_ID).isEmpty && enabledServerAcls()) {
addACL()
- } else if (conf.get(HighAvailabilityConf.HA_ZK_ENGINE_REF_ID).nonEmpty && enabledEngineAcls()) {
+ } else if (conf.get(HighAvailabilityConf.HA_ENGINE_REF_ID).nonEmpty && enabledEngineAcls()) {
addACL()
} else {
// ACLs for znodes on a non-kerberized cluster
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
index 5ea121e79..0cfc3af4a 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
@@ -119,10 +119,10 @@ object ZookeeperClientProvider extends Logging {
}
}
- if (conf.get(HA_ZK_ENGINE_REF_ID).isEmpty
+ if (conf.get(HA_ENGINE_REF_ID).isEmpty
&& AuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == AuthTypes.KERBEROS) {
setupZkAuth()
- } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && AuthTypes
+ } else if (conf.get(HA_ENGINE_REF_ID).nonEmpty && AuthTypes
.withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
setupZkAuth()
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index d1ce32685..5b6beaa5d 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -46,7 +46,7 @@ import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
import org.apache.kyuubi.ha.client.DiscoveryClient
@@ -344,7 +344,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
- val session = conf.get(HA_ZK_ENGINE_REF_ID)
+ val session = conf.get(HA_ENGINE_REF_ID)
.map(refId => s"refId=$refId;").getOrElse("")
val pathPrefix = ZKPaths.makePath(
namespace,
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
new file mode 100644
index 000000000..41bc92f64
--- /dev/null
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_AUTH_TYPE
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.service.NoopTBinaryFrontendServer
+import org.apache.kyuubi.service.NoopTBinaryFrontendService
+import org.apache.kyuubi.service.Serverable
+import org.apache.kyuubi.service.Service
+import org.apache.kyuubi.service.ServiceState
+
+trait DiscoveryClientTests extends KyuubiFunSuite {
+ protected val conf: KyuubiConf
+
+ protected def getConnectString(): String
+
+ test("publish instance to embedded zookeeper server") {
+ val namespace = "kyuubiserver"
+
+ conf
+ .unset(KyuubiConf.SERVER_KEYTAB)
+ .unset(KyuubiConf.SERVER_PRINCIPAL)
+ .set(HA_ADDRESSES, getConnectString)
+ .set(HA_NAMESPACE, namespace)
+ .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+
+ var serviceDiscovery: KyuubiServiceDiscovery = null
+ val server: Serverable = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ serviceDiscovery = new KyuubiServiceDiscovery(this)
+ Some(serviceDiscovery)
+ }
+ })
+ }
+ server.initialize(conf)
+ server.start()
+ val znodeRoot = s"/$namespace"
+ withDiscoveryClient(conf) { framework =>
+ try {
+ assert(framework.pathNonExists("/abc"))
+ assert(framework.pathExists(znodeRoot))
+ val children = framework.getChildren(znodeRoot)
+ assert(children.head ===
+ s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+ s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+ children.foreach { child =>
+ framework.delete(s"""$znodeRoot/$child""")
+ }
+ eventually(timeout(5.seconds), interval(100.millis)) {
+ assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+ assert(server.getServiceState === ServiceState.STOPPED)
+ }
+ } finally {
+ server.stop()
+ }
+ }
+ }
+
+ test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
+ val logAppender = new LogAppender("test stop engine gracefully")
+ withLogAppender(logAppender) {
+ val namespace = "kyuubiengine"
+
+ conf
+ .unset(KyuubiConf.SERVER_KEYTAB)
+ .unset(KyuubiConf.SERVER_PRINCIPAL)
+ .set(HA_ADDRESSES, getConnectString())
+ .set(HA_NAMESPACE, namespace)
+ .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
+
+ var serviceDiscovery: KyuubiServiceDiscovery = null
+ val server: Serverable = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ serviceDiscovery = new KyuubiServiceDiscovery(this)
+ Some(serviceDiscovery)
+ }
+ })
+ }
+ server.initialize(conf)
+ server.start()
+
+ val znodeRoot = s"/$namespace"
+ withDiscoveryClient(conf) { framework =>
+ try {
+
+ assert(framework.pathNonExists("/abc"))
+ assert(framework.pathExists(znodeRoot))
+ val children = framework.getChildren(znodeRoot)
+ assert(children.head ===
+ s"serviceUri=${server.frontendServices.head.connectionUrl};" +
+ s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+ children.foreach { child =>
+ framework.delete(s"""$znodeRoot/$child""")
+ }
+ eventually(timeout(5.seconds), interval(100.millis)) {
+ assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+ assert(server.getServiceState === ServiceState.STOPPED)
+ val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
+ s" is now de-registered"
+ assert(logAppender.loggingEvents.exists(
+ _.getMessage.getFormattedMessage.contains(msg)))
+ }
+ } finally {
+ server.stop()
+ serviceDiscovery.stop()
+ }
+ }
+ }
+ }
+
+ test("parse host and port from instance string") {
+ val host = "127.0.0.1"
+ val port = 10009
+ val instance1 = s"$host:$port"
+ val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
+ assert(host === host1)
+ assert(port === port1)
+
+ val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
+ s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
+ s"hive.server2.thrift.port=$port;" +
+ s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
+ val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
+ assert(host === host2)
+ assert(port === port2)
+ }
+}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
new file mode 100644
index 000000000..b0de297dc
--- /dev/null
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.etcd
+
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientTests
+import org.apache.kyuubi.service.NoopTBinaryFrontendServer
+
+class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
+ private var etcdCluster: EtcdCluster = _
+ var engineServer: NoopTBinaryFrontendServer = _
+
+ private lazy val _connectString = etcdCluster.clientEndpoints().asScala.mkString(",")
+
+ override def getConnectString(): String = _connectString
+
+ val conf: KyuubiConf = {
+ KyuubiConf()
+ .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+ }
+
+ override def beforeAll(): Unit = {
+ etcdCluster = new Etcd.Builder().withNodes(1).build()
+ etcdCluster.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ if (etcdCluster != null) {
+ etcdCluster.close()
+ }
+ super.afterAll()
+ }
+
+ test("etcd test: set, get and delete") {
+ withDiscoveryClient(conf) { discoveryClient =>
+ val path = "/kyuubi"
+ // set
+ discoveryClient.create(path, "PERSISTENT")
+ assert(discoveryClient.pathExists(path))
+
+ // get
+ assert(new String(discoveryClient.getData(path), StandardCharsets.UTF_8) == path)
+
+ // delete
+ discoveryClient.delete(path)
+ assert(!discoveryClient.pathExists(path))
+ }
+ }
+}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index afa744324..8b87d4aad 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -30,22 +30,21 @@ import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.ACL
import org.scalatest.time.SpanSugar._
-import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
+import org.apache.kyuubi.KerberizedTestHelper
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientTests
import org.apache.kyuubi.ha.client.EngineServiceDiscovery
-import org.apache.kyuubi.ha.client.KyuubiServiceDiscovery
import org.apache.kyuubi.service._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
- import DiscoveryClientProvider._
+class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with KerberizedTestHelper {
val zkServer = new EmbeddedZookeeper()
- val conf: KyuubiConf = KyuubiConf()
+ override val conf: KyuubiConf = KyuubiConf()
+
+ override def getConnectString(): String = zkServer.getConnectString
override def beforeAll(): Unit = {
conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
@@ -62,51 +61,6 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
super.afterAll()
}
- test("publish instance to embedded zookeeper server") {
- val namespace = "kyuubiserver"
-
- conf
- .unset(KyuubiConf.SERVER_KEYTAB)
- .unset(KyuubiConf.SERVER_PRINCIPAL)
- .set(HA_ADDRESSES, zkServer.getConnectString)
- .set(HA_NAMESPACE, namespace)
- .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
-
- var serviceDiscovery: KyuubiServiceDiscovery = null
- val server: Serverable = new NoopTBinaryFrontendServer() {
- override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
- new NoopTBinaryFrontendService(this) {
- override val discoveryService: Option[Service] = {
- serviceDiscovery = new KyuubiServiceDiscovery(this)
- Some(serviceDiscovery)
- }
- })
- }
- server.initialize(conf)
- server.start()
- val znodeRoot = s"/$namespace"
- withDiscoveryClient(conf) { framework =>
- try {
- assert(framework.pathNonExists("/abc"))
- assert(framework.pathExists(znodeRoot))
- val children = framework.getChildren(znodeRoot)
- assert(children.head ===
- s"serviceUri=${server.frontendServices.head.connectionUrl};" +
- s"version=$KYUUBI_VERSION;sequence=0000000000")
-
- children.foreach { child =>
- framework.delete(s"""$znodeRoot/$child""")
- }
- eventually(timeout(5.seconds), interval(100.millis)) {
- assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
- assert(server.getServiceState === ServiceState.STOPPED)
- }
- } finally {
- server.stop()
- }
- }
- }
-
test("acl for zookeeper") {
val expectedNoACL = new util.ArrayList[ACL](ZooDefs.Ids.OPEN_ACL_UNSAFE)
val expectedEnableACL = new util.ArrayList[ACL](ZooDefs.Ids.READ_ACL_UNSAFE)
@@ -124,12 +78,12 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
val serverACL = new ZookeeperACLProvider(serverConf).getDefaultAcl
assertACL(expectedEnableACL, serverACL)
- val engineConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
+ val engineConf = serverConf.clone.set(HA_ENGINE_REF_ID, "ref")
engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.NONE.toString)
val engineACL = new ZookeeperACLProvider(engineConf).getDefaultAcl
assertACL(expectedNoACL, engineACL)
- val enableEngineACLConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
+ val enableEngineACLConf = serverConf.clone.set(HA_ENGINE_REF_ID, "ref")
enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.KERBEROS.toString)
val enableEngineACL = new ZookeeperACLProvider(enableEngineACLConf).getDefaultAcl
assertACL(expectedEnableACL, enableEngineACL)
@@ -161,79 +115,6 @@ class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
}
}
- test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
- val logAppender = new LogAppender("test stop engine gracefully")
- withLogAppender(logAppender) {
- val namespace = "kyuubiengine"
-
- conf
- .unset(KyuubiConf.SERVER_KEYTAB)
- .unset(KyuubiConf.SERVER_PRINCIPAL)
- .set(HA_ADDRESSES, zkServer.getConnectString)
- .set(HA_NAMESPACE, namespace)
- .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
-
- var serviceDiscovery: KyuubiServiceDiscovery = null
- val server: Serverable = new NoopTBinaryFrontendServer() {
- override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
- new NoopTBinaryFrontendService(this) {
- override val discoveryService: Option[Service] = {
- serviceDiscovery = new KyuubiServiceDiscovery(this)
- Some(serviceDiscovery)
- }
- })
- }
- server.initialize(conf)
- server.start()
-
- val znodeRoot = s"/$namespace"
- withDiscoveryClient(conf) { framework =>
- try {
-
- assert(framework.pathNonExists("/abc"))
- assert(framework.pathExists(znodeRoot))
- val children = framework.getChildren(znodeRoot)
- assert(children.head ===
- s"serviceUri=${server.frontendServices.head.connectionUrl};" +
- s"version=$KYUUBI_VERSION;sequence=0000000000")
-
- children.foreach { child =>
- framework.delete(s"""$znodeRoot/$child""")
- }
- eventually(timeout(5.seconds), interval(100.millis)) {
- assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
- assert(server.getServiceState === ServiceState.STOPPED)
- val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
- s" is now de-registered"
- assert(logAppender.loggingEvents.exists(
- _.getMessage.getFormattedMessage.contains(msg)))
- }
- } finally {
- server.stop()
- serviceDiscovery.stop()
- }
- }
- }
- }
-
- test("parse host and port from instance string") {
- val host = "127.0.0.1"
- val port = 10009
- val instance1 = s"$host:$port"
- val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
- assert(host === host1)
- assert(port === port1)
-
- val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
- s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
- s"hive.server2.thrift.port=$port;" +
- s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
- val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
- assert(host === host2)
- assert(port === port2)
- }
-
test("stop engine in time while zk ensemble terminates") {
val zkServer = new EmbeddedZookeeper()
val conf = KyuubiConf()
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 9c04d2294..c5013db44 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -277,6 +277,16 @@
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-sql-engine_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </exclusion>
+ </exclusions>
<scope>test</scope>
</dependency>
@@ -461,6 +471,12 @@
<artifactId>scala-compiler</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 34acd5889..6cbd1ff94 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.hive.HiveProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_NAMESPACE, HA_ZK_ENGINE_REF_ID}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
@@ -167,7 +167,7 @@ private[kyuubi] class EngineRef(
if (engineRef.nonEmpty) return engineRef.get
conf.set(HA_NAMESPACE, engineSpace)
- conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
+ conf.set(HA_ENGINE_REF_ID, engineRefId)
val started = System.currentTimeMillis()
conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
builder = engineType match {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
similarity index 81%
rename from kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
rename to kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
index 096ccc656..538a36805 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
@@ -23,32 +23,28 @@ import java.util.concurrent.Executors
import org.apache.hadoop.security.UserGroupInformation
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, Utils}
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.EngineType._
+import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.util.NamedThreadFactory
-import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-
-class EngineRefSuite extends KyuubiFunSuite {
- import EngineType._
- import ShareLevel._
- private val zkServer = new EmbeddedZookeeper
- private val conf = KyuubiConf()
- private val user = Utils.currentUser
+
+trait EngineRefTests extends KyuubiFunSuite {
+
+ protected val user = Utils.currentUser
private val metricsSystem = new MetricsSystem
+ protected val conf: KyuubiConf
+ protected def getConnectString(): String
+
override def beforeAll(): Unit = {
- val zkData = Utils.createTempDir()
- conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
- .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
- .set("spark.sql.catalogImplementation", "in-memory")
- zkServer.initialize(conf)
- zkServer.start()
metricsSystem.initialize(conf)
metricsSystem.start()
super.beforeAll()
@@ -56,7 +52,6 @@ class EngineRefSuite extends KyuubiFunSuite {
override def afterAll(): Unit = {
metricsSystem.stop()
- zkServer.stop()
super.afterAll()
}
@@ -222,7 +217,7 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
- conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
+ conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
val engine = new EngineRef(conf, user, id, null)
var port1 = 0
@@ -257,53 +252,6 @@ class EngineRefSuite extends KyuubiFunSuite {
}
}
- // KYUUBI #2827 remove all engines dependencies except to spark from server
- ignore("different engine type should use its own lock") {
- conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
- conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
- conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test1")
- conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
- val conf1 = conf.clone
- conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
- val conf2 = conf.clone
- conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
-
- val start = System.currentTimeMillis()
- val times = new Array[Long](2)
- val executor = Executors.newFixedThreadPool(2)
- try {
- executor.execute(() => {
- DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
- try {
- new EngineRef(conf1, user, UUID.randomUUID().toString, null)
- .getOrCreate(client)
- } finally {
- times(0) = System.currentTimeMillis()
- }
- }
- })
- executor.execute(() => {
- DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
- try {
- new EngineRef(conf2, user, UUID.randomUUID().toString, null)
- .getOrCreate(client)
- } finally {
- times(1) = System.currentTimeMillis()
- }
- }
- })
-
- eventually(timeout(10.seconds), interval(200.milliseconds)) {
- assert(times.forall(_ > start))
- // ENGINE_INIT_TIMEOUT is 3000ms
- assert(times.max - times.min < 2500)
- }
- } finally {
- executor.shutdown()
- }
- }
-
test("three same lock request with initialization timeout") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
@@ -311,7 +259,7 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test2")
- conf.set(HighAvailabilityConf.HA_ADDRESSES, zkServer.getConnectString)
+ conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
val start = System.currentTimeMillis()
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala
new file mode 100644
index 000000000..80bb09e20
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithEtcdSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import scala.collection.JavaConverters._
+
+import io.etcd.jetcd.launcher.Etcd
+import io.etcd.jetcd.launcher.EtcdCluster
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
+import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient
+
+class EngineRefWithEtcdSuite extends EngineRefTests {
+
+ private var etcdCluster: EtcdCluster = _
+ private lazy val _connectString: String = etcdCluster.clientEndpoints().asScala.mkString(",")
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf()
+ .set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
+ .set("spark.sql.catalogImplementation", "in-memory")
+ }
+
+ override protected def getConnectString(): String = _connectString
+
+ override def beforeAll(): Unit = {
+ etcdCluster = new Etcd.Builder().withNodes(1).build()
+ etcdCluster.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ if (etcdCluster != null) {
+ etcdCluster.close()
+ }
+ super.afterAll()
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala
new file mode 100644
index 000000000..75dd5cc09
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.util.UUID
+import java.util.concurrent.Executors
+
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.EngineType.HIVE_SQL
+import org.apache.kyuubi.engine.EngineType.SPARK_SQL
+import org.apache.kyuubi.engine.ShareLevel.USER
+import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf
+
+class EngineRefWithZookeeperSuite extends EngineRefTests {
+
+ private val zkServer = new EmbeddedZookeeper
+ val conf = KyuubiConf()
+
+ override def beforeAll(): Unit = {
+ val zkData = Utils.createTempDir()
+ conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
+ .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
+ .set("spark.sql.catalogImplementation", "in-memory")
+ zkServer.initialize(conf)
+ zkServer.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ zkServer.stop()
+ super.afterAll()
+ }
+
+ def getConnectString(): String = zkServer.getConnectString
+
+ // TODO mvoe to EngineRefTests when etcd discovery support more engines
+ // KYUUBI #2827 remove all engines dependencies except to spark from server
+ ignore("different engine type should use its own lock") {
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test1")
+ conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
+ val conf1 = conf.clone
+ conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ val conf2 = conf.clone
+ conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
+
+ val start = System.currentTimeMillis()
+ val times = new Array[Long](2)
+ val executor = Executors.newFixedThreadPool(2)
+ try {
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
+ try {
+ new EngineRef(conf1, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(0) = System.currentTimeMillis()
+ }
+ }
+ })
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
+ try {
+ new EngineRef(conf2, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(1) = System.currentTimeMillis()
+ }
+ }
+ })
+
+ eventually(timeout(10.seconds), interval(200.milliseconds)) {
+ assert(times.forall(_ > start))
+ // ENGINE_INIT_TIMEOUT is 3000ms
+ assert(times.max - times.min < 2500)
+ }
+ } finally {
+ executor.shutdown()
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 40b38bc7d..155141868 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.10</commons-lang3.version>
<curator.version>2.12.0</curator.version>
+ <etcd.version>0.7.1</etcd.version>
<delta.version>2.0.0rc1</delta.version>
<fb303.version>0.9.3</fb303.version>
<flink.version>1.14.5</flink.version>
@@ -119,6 +120,7 @@
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
<google.jsr305.version>3.0.2</google.jsr305.version>
+ <grpc.verion>1.47.0</grpc.verion>
<gson.version>2.8.9</gson.version>
<guava.version>30.1-jre</guava.version>
<guava.failureaccess.version>1.0.1</guava.failureaccess.version>
@@ -1023,6 +1025,54 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${etcd.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <version>${etcd.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.verion}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ <version>${grpc.verion}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.verion}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.verion}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.verion}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>${iceberg.name}</artifactId>