You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/15 17:36:22 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4492] Correct engine subdomain calculation in `kyuubi-ctl`
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 4ed4a448f [KYUUBI #4492] Correct engine subdomain calculation in `kyuubi-ctl`
4ed4a448f is described below
commit 4ed4a448f1b38da3780b48e8962ade3099237da1
Author: Alex <zo...@kanzhun.com>
AuthorDate: Thu Mar 16 01:35:46 2023 +0800
[KYUUBI #4492] Correct engine subdomain calculation in `kyuubi-ctl`
### _Why are the changes needed?_
The CONNECTION share level engines always use a UUID as the subdomain in registering path, kyuubi-ctl's wrong subdomain calculation causes a failure on listing engines.
```
./kyuubi-ctl list engine --zk-quorum xxx --namespace kyuubi --user xxx
```
```
2023-03-10 13:53:32.939 INFO org.apache.curator.framework.state.ConnectionStateManager: State change: CONNECTED
2023-03-10 13:53:32.945 ERROR org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient: Failed to get service node info
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kyuubi_1.7.0_CONNECTION_SPARK_SQL/xxx/default
at org.apache.zookeeper.KeeperException.create(KeeperException.java:114) ~[zookeeper-3.4.14.jar:3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf]
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[zookeeper-3.4.14.jar:3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf]
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1659) ~[zookeeper-3.4.14.jar:3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:230) ~[curator-framework-2.12.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:219) ~[curator-framework-2.12.0.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) ~[curator-client-2.12.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:216) ~[curator-framework-2.12.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:207) ~[curator-framework-2.12.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:40) ~[curator-framework-2.12.0.jar:?]
at org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient.getServiceNodesInfo(ZookeeperDiscoveryClient.scala:214) ~[kyuubi-ha_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.util.CtlUtils$.getServiceNodes(CtlUtils.scala:63) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.util.CtlUtils$.$anonfun$listZkServerNodes$1(CtlUtils.scala:86) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.util.CtlUtils$.$anonfun$listZkServerNodes$1$adapted(CtlUtils.scala:80) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ha.client.DiscoveryClientProvider$.withDiscoveryClient(DiscoveryClientProvider.scala:36) ~[kyuubi-ha_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.util.CtlUtils$.listZkServerNodes(CtlUtils.scala:80) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cmd.list.ListCommand.doRun(ListCommand.scala:32) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cmd.list.ListCommand.doRun(ListCommand.scala:24) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cmd.Command.run(Command.scala:47) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cli.ControlCli.doAction(ControlCli.scala:46) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cli.ControlCli$$anon$1.doAction(ControlCli.scala:79) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cli.ControlCli$.main(ControlCli.scala:87) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
at org.apache.kyuubi.ctl.cli.ControlCli.main(ControlCli.scala) ~[kyuubi-ctl_2.12-1.7.0.jar:1.7.0]
2023-03-10 13:53:32.949 INFO org.apache.curator.framework.imps.CuratorFrameworkImpl: backgroundOperationsLoop exiting
2023-03-10 13:53:32.952 INFO org.apache.zookeeper.ZooKeeper: Session: 0x500011010ecc76e closed
2023-03-10 13:53:32.953 INFO org.apache.zookeeper.ClientCnxn: EventThread shut down for session: 0x500011010ecc76e
```
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
buid model kyuubi-ctl and deploy to my environment and run : kyuubi-ctl list engine -zk xxx:2181 -n kyuubi -u xxx
get correct result.
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4492 from Kiss736921/fix_list_engine_error.
Closes #4492
29b87ed57 [Alex] change param name engine to engineNode
9e87b0853 [Alex] optimize get cmd architecture and complete delete engine for all share level
20544c296 [Cheng Pan] ctl should handle server and engine nodes seperately
9000129df [Alex] fix list engine no node exception
Lead-authored-by: Alex <zo...@kanzhun.com>
Co-authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 0f45f26a354490c763eec186dacf0615b783b9dd)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../ctl/cmd/create/CreateServerCommand.scala | 2 +-
.../kyuubi/ctl/cmd/delete/DeleteCommand.scala | 31 +------
.../ctl/cmd/delete/DeleteEngineCommand.scala | 30 +++++++
.../ctl/cmd/delete/DeleteServerCommand.scala | 30 ++++++-
.../org/apache/kyuubi/ctl/cmd/get/GetCommand.scala | 8 +-
.../kyuubi/ctl/cmd/get/GetEngineCommand.scala | 10 +++
.../kyuubi/ctl/cmd/get/GetServerCommand.scala | 11 ++-
.../apache/kyuubi/ctl/cmd/list/ListCommand.scala | 8 +-
.../kyuubi/ctl/cmd/list/ListEngineCommand.scala | 5 ++
.../kyuubi/ctl/cmd/list/ListServerCommand.scala | 8 +-
.../org/apache/kyuubi/ctl/util/CtlUtils.scala | 99 ++++++++++++----------
.../org/apache/kyuubi/ctl/ControlCliSuite.scala | 85 ++++++++++++-------
12 files changed, 212 insertions(+), 115 deletions(-)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
index 66f75fc5f..f4d4ce2ea 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
@@ -56,7 +56,7 @@ class CreateServerCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeI
withDiscoveryClient(kyuubiConf) { discoveryClient =>
val fromNamespace =
DiscoveryPaths.makePath(null, kyuubiConf.get(HA_NAMESPACE))
- val toNamespace = CtlUtils.getZkNamespace(kyuubiConf, normalizedCliConfig)
+ val toNamespace = CtlUtils.getZkServerNamespace(kyuubiConf, normalizedCliConfig)
val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
index 69479259a..ddbe083ce 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
@@ -16,15 +16,13 @@
*/
package org.apache.kyuubi.ctl.cmd.delete
-import scala.collection.mutable.ListBuffer
-
import org.apache.kyuubi.ctl.cmd.Command
import org.apache.kyuubi.ctl.opt.CliConfig
-import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ctl.util.{Render, Validator}
import org.apache.kyuubi.ha.client.ServiceNodeInfo
-class DeleteCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cliConfig) {
+abstract class DeleteCommand(cliConfig: CliConfig)
+ extends Command[Seq[ServiceNodeInfo]](cliConfig) {
def validate(): Unit = {
Validator.validateZkArguments(normalizedCliConfig)
@@ -35,28 +33,7 @@ class DeleteCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](
/**
* Delete zookeeper service node with specified host port.
*/
- def doRun(): Seq[ServiceNodeInfo] = {
- withDiscoveryClient(conf) { discoveryClient =>
- val znodeRoot = CtlUtils.getZkNamespace(conf, normalizedCliConfig)
- val hostPortOpt =
- Some((normalizedCliConfig.zkOpts.host, normalizedCliConfig.zkOpts.port.toInt))
- val nodesToDelete = CtlUtils.getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-
- val deletedNodes = ListBuffer[ServiceNodeInfo]()
- nodesToDelete.foreach { node =>
- val nodePath = s"$znodeRoot/${node.nodeName}"
- info(s"Deleting zookeeper service node:$nodePath")
- try {
- discoveryClient.delete(nodePath)
- deletedNodes += node
- } catch {
- case e: Exception =>
- error(s"Failed to delete zookeeper service node:$nodePath", e)
- }
- }
- deletedNodes
- }
- }
+ def doRun(): Seq[ServiceNodeInfo]
def render(nodes: Seq[ServiceNodeInfo]): Unit = {
val title = "Deleted zookeeper service nodes"
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
index 7be607467..ab6e81e24 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
@@ -16,7 +16,12 @@
*/
package org.apache.kyuubi.ctl.cmd.delete
+import scala.collection.mutable.ListBuffer
+
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
class DeleteEngineCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {
@@ -28,4 +33,29 @@ class DeleteEngineCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig)
fail("Must specify user name for engine, please use -u or --user.")
}
}
+
+ def doRun(): Seq[ServiceNodeInfo] = {
+ withDiscoveryClient(conf) { discoveryClient =>
+ val hostPortOpt =
+ Some((cliConfig.zkOpts.host, cliConfig.zkOpts.port.toInt))
+ val candidateNodes = CtlUtils.listZkEngineNodes(conf, normalizedCliConfig, hostPortOpt)
+ hostPortOpt.map { case (host, port) =>
+ candidateNodes.filter { cn => cn.host == host && cn.port == port }
+ }.getOrElse(candidateNodes)
+ val deletedNodes = ListBuffer[ServiceNodeInfo]()
+ candidateNodes.foreach { node =>
+ val engineNode = discoveryClient.getChildren(node.namespace)(0)
+ val nodePath = s"${node.namespace}/$engineNode"
+ info(s"Deleting zookeeper service node:$nodePath")
+ try {
+ discoveryClient.delete(nodePath)
+ deletedNodes += node
+ } catch {
+ case e: Exception =>
+ error(s"Failed to delete zookeeper service node:$nodePath", e)
+ }
+ }
+ deletedNodes
+ }
+ }
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
index 6debba4d5..197b78645 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
@@ -16,6 +16,34 @@
*/
package org.apache.kyuubi.ctl.cmd.delete
+import scala.collection.mutable.ListBuffer
+
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+
+class DeleteServerCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {
+ override def doRun(): Seq[ServiceNodeInfo] = {
+ withDiscoveryClient(conf) { discoveryClient =>
+ val znodeRoot = CtlUtils.getZkServerNamespace(conf, normalizedCliConfig)
+ val hostPortOpt =
+ Some((normalizedCliConfig.zkOpts.host, normalizedCliConfig.zkOpts.port.toInt))
+ val nodesToDelete = CtlUtils.getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-class DeleteServerCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {}
+ val deletedNodes = ListBuffer[ServiceNodeInfo]()
+ nodesToDelete.foreach { node =>
+ val nodePath = s"$znodeRoot/${node.nodeName}"
+ info(s"Deleting zookeeper service node:$nodePath")
+ try {
+ discoveryClient.delete(nodePath)
+ deletedNodes += node
+ } catch {
+ case e: Exception =>
+ error(s"Failed to delete zookeeper service node:$nodePath", e)
+ }
+ }
+ deletedNodes
+ }
+ }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
index d78f0b995..af8285105 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
@@ -18,10 +18,10 @@ package org.apache.kyuubi.ctl.cmd.get
import org.apache.kyuubi.ctl.cmd.Command
import org.apache.kyuubi.ctl.opt.CliConfig
-import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
+import org.apache.kyuubi.ctl.util.{Render, Validator}
import org.apache.kyuubi.ha.client.ServiceNodeInfo
-class GetCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cliConfig) {
+abstract class GetCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cliConfig) {
def validate(): Unit = {
Validator.validateZkArguments(normalizedCliConfig)
@@ -29,9 +29,7 @@ class GetCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cli
mergeArgsIntoKyuubiConf()
}
- def doRun(): Seq[ServiceNodeInfo] = {
- CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = true)
- }
+ def doRun(): Seq[ServiceNodeInfo]
def render(nodes: Seq[ServiceNodeInfo]): Unit = {
val title = "Zookeeper service nodes"
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
index 4d9101625..13f4d00c8 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.ctl.cmd.get
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
class GetEngineCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {
@@ -28,4 +30,12 @@ class GetEngineCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {
fail("Must specify user name for engine, please use -u or --user.")
}
}
+
+ override def doRun(): Seq[ServiceNodeInfo] = {
+ CtlUtils.listZkEngineNodes(
+ conf,
+ normalizedCliConfig,
+ Some((cliConfig.zkOpts.host, cliConfig.zkOpts.port.toInt)))
+ }
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
index 71b868453..faa76b219 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
@@ -17,5 +17,14 @@
package org.apache.kyuubi.ctl.cmd.get
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
-class GetServerCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {}
+class GetServerCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {
+ override def doRun(): Seq[ServiceNodeInfo] = {
+ CtlUtils.listZkServerNodes(
+ conf,
+ normalizedCliConfig,
+ Some((cliConfig.zkOpts.host, cliConfig.zkOpts.port.toInt)))
+ }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
index 0cfeb8e4e..e5a3a6882 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
@@ -18,19 +18,17 @@ package org.apache.kyuubi.ctl.cmd.list
import org.apache.kyuubi.ctl.cmd.Command
import org.apache.kyuubi.ctl.opt.CliConfig
-import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
+import org.apache.kyuubi.ctl.util.{Render, Validator}
import org.apache.kyuubi.ha.client.ServiceNodeInfo
-class ListCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cliConfig) {
+abstract class ListCommand(cliConfig: CliConfig) extends Command[Seq[ServiceNodeInfo]](cliConfig) {
def validate(): Unit = {
Validator.validateZkArguments(normalizedCliConfig)
mergeArgsIntoKyuubiConf()
}
- def doRun(): Seq[ServiceNodeInfo] = {
- CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = false)
- }
+ def doRun(): Seq[ServiceNodeInfo]
def render(nodes: Seq[ServiceNodeInfo]): Unit = {
val title = "Zookeeper service nodes"
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
index 6a78a9e97..8a26b4cc9 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.ctl.cmd.list
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
class ListEngineCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {
@@ -28,4 +30,7 @@ class ListEngineCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {
fail("Must specify user name for engine, please use -u or --user.")
}
}
+
+ override def doRun(): Seq[ServiceNodeInfo] =
+ CtlUtils.listZkEngineNodes(conf, normalizedCliConfig, None)
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
index 8c3219ece..56e8f4695 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
@@ -17,5 +17,11 @@
package org.apache.kyuubi.ctl.cmd.list
import org.apache.kyuubi.ctl.opt.CliConfig
+import org.apache.kyuubi.ctl.util.CtlUtils
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
-class ListServerCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {}
+class ListServerCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {
+ override def doRun(): Seq[ServiceNodeInfo] = {
+ CtlUtils.listZkServerNodes(conf, normalizedCliConfig, None)
+ }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
index fdcc127f1..8ce1d611a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
@@ -25,48 +25,35 @@ import org.yaml.snakeyaml.Yaml
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUBDOMAIN, ENGINE_TYPE}
-import org.apache.kyuubi.ctl.opt.{CliConfig, ControlObject}
+import org.apache.kyuubi.ctl.opt.CliConfig
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
object CtlUtils {
- private[ctl] def getZkNamespace(conf: KyuubiConf, cliConfig: CliConfig): String = {
- cliConfig.resource match {
- case ControlObject.SERVER =>
- DiscoveryPaths.makePath(null, cliConfig.zkOpts.namespace)
- case ControlObject.ENGINE =>
- val engineType = Some(cliConfig.engineOpts.engineType)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_TYPE))
- val engineSubdomain = Some(cliConfig.engineOpts.engineSubdomain)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
- val engineShareLevel = Some(cliConfig.engineOpts.engineShareLevel)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
- // The path of the engine defined in zookeeper comes from
- // org.apache.kyuubi.engine.EngineRef#engineSpace
- DiscoveryPaths.makePath(
- s"${cliConfig.zkOpts.namespace}_" +
- s"${cliConfig.zkOpts.version}_" +
- s"${engineShareLevel}_${engineType}",
- cliConfig.engineOpts.user,
- engineSubdomain)
- }
+ private[ctl] def getZkServerNamespace(conf: KyuubiConf, cliConfig: CliConfig): String = {
+ DiscoveryPaths.makePath(null, cliConfig.zkOpts.namespace)
}
- private[ctl] def getServiceNodes(
- discoveryClient: DiscoveryClient,
- znodeRoot: String,
- hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
- val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
- hostPortOpt match {
- case Some((host, port)) => serviceNodes.filter { sn =>
- sn.host == host && sn.port == port
- }
- case _ => serviceNodes
- }
+ private[ctl] def getZkEngineNamespaceAndSubdomain(
+ conf: KyuubiConf,
+ cliConfig: CliConfig): (String, Option[String]) = {
+ val engineType = Some(cliConfig.engineOpts.engineType)
+ .filter(_ != null).filter(_.nonEmpty)
+ .getOrElse(conf.get(ENGINE_TYPE))
+ val engineShareLevel = Some(cliConfig.engineOpts.engineShareLevel)
+ .filter(_ != null).filter(_.nonEmpty)
+ .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
+ val engineSubdomain = Option(cliConfig.engineOpts.engineSubdomain)
+ .filter(_.nonEmpty).orElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN))
+ // The path of the engine defined in zookeeper comes from
+ // org.apache.kyuubi.engine.EngineRef#engineSpace
+ val rootPath = DiscoveryPaths.makePath(
+ s"${cliConfig.zkOpts.namespace}_" +
+ s"${cliConfig.zkOpts.version}_" +
+ s"${engineShareLevel}_${engineType}",
+ cliConfig.engineOpts.user)
+ (rootPath, engineSubdomain)
}
/**
@@ -75,17 +62,41 @@ object CtlUtils {
private[ctl] def listZkServerNodes(
conf: KyuubiConf,
cliConfig: CliConfig,
- filterHostPort: Boolean): Seq[ServiceNodeInfo] = {
- var nodes = Seq.empty[ServiceNodeInfo]
+ hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
withDiscoveryClient(conf) { discoveryClient =>
- val znodeRoot = getZkNamespace(conf, cliConfig)
- val hostPortOpt =
- if (filterHostPort) {
- Some((cliConfig.zkOpts.host, cliConfig.zkOpts.port.toInt))
- } else None
- nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
+ val znodeRoot = getZkServerNamespace(conf, cliConfig)
+ getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
}
- nodes
+ }
+
+ /**
+ * List Kyuubi engine nodes info.
+ */
+ private[ctl] def listZkEngineNodes(
+ conf: KyuubiConf,
+ cliConfig: CliConfig,
+ hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
+ withDiscoveryClient(conf) { discoveryClient =>
+ val (znodeRoot, subdomainOpt) = getZkEngineNamespaceAndSubdomain(conf, cliConfig)
+ val candidates = discoveryClient.getChildren(znodeRoot)
+ val matched = subdomainOpt match {
+ case Some(subdomain) => candidates.filter(_ == subdomain)
+ case None => candidates
+ }
+ matched.flatMap { subdomain =>
+ getServiceNodes(discoveryClient, s"$znodeRoot/$subdomain", hostPortOpt)
+ }
+ }
+ }
+
+ private[ctl] def getServiceNodes(
+ discoveryClient: DiscoveryClient,
+ znodeRoot: String,
+ hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
+ val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
+ hostPortOpt.map { case (host, port) =>
+ serviceNodes.filter { sn => sn.host == host && sn.port == port }
+ }.getOrElse(serviceNodes)
}
private[ctl] def loadYamlAsMap(cliConfig: CliConfig): JMap[String, Object] = {
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
index d27f3ec2a..43a694a08 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
@@ -199,20 +199,23 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
}
}
- test("test get zk namespace for different service type") {
- val arg1 = Array(
+ test("test get zk server namespace") {
+ val args = Array(
"list",
"server",
"--zk-quorum",
zkServer.getConnectString,
"--namespace",
namespace)
- val scArgs1 = new ControlCliArguments(arg1)
- assert(CtlUtils.getZkNamespace(
- scArgs1.command.conf,
- scArgs1.command.normalizedCliConfig) == s"/$namespace")
+ val scArgs = new ControlCliArguments(args)
+ assert(
+ CtlUtils.getZkServerNamespace(
+ scArgs.command.conf,
+ scArgs.command.normalizedCliConfig) === s"/$namespace")
+ }
- val arg2 = Array(
+ test("test get zk engine namespace") {
+ val args = Array(
"list",
"engine",
"--zk-quorum",
@@ -221,9 +224,11 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
namespace,
"--user",
user)
- val scArgs2 = new ControlCliArguments(arg2)
- assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
+ val scArgs = new ControlCliArguments(args)
+ val expected = (s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs.command.conf,
+ scArgs.command.normalizedCliConfig) === expected)
}
test("test list zk service nodes info") {
@@ -364,8 +369,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--user",
user)
val scArgs1 = new ControlCliArguments(arg1)
- assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
+ val expected1 = (s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs1.command.conf,
+ scArgs1.command.normalizedCliConfig) === expected1)
val arg2 = Array(
"list",
@@ -379,8 +386,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-type",
"FLINK_SQL")
val scArgs2 = new ControlCliArguments(arg2)
- assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_FLINK_SQL/$user/default")
+ val expected2 = (s"/${namespace}_${KYUUBI_VERSION}_USER_FLINK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs2.command.conf,
+ scArgs2.command.normalizedCliConfig) === expected2)
val arg3 = Array(
"list",
@@ -394,8 +403,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-type",
"TRINO")
val scArgs3 = new ControlCliArguments(arg3)
- assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_TRINO/$user/default")
+ val expected3 = (s"/${namespace}_${KYUUBI_VERSION}_USER_TRINO/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs3.command.conf,
+ scArgs3.command.normalizedCliConfig) === expected3)
val arg4 = Array(
"list",
@@ -411,8 +422,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-subdomain",
"sub_1")
val scArgs4 = new ControlCliArguments(arg4)
- assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/sub_1")
+ val expected4 = (s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user", Some("sub_1"))
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs4.command.conf,
+ scArgs4.command.normalizedCliConfig) === expected4)
val arg5 = Array(
"list",
@@ -430,8 +443,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-subdomain",
"sub_1")
val scArgs5 = new ControlCliArguments(arg5)
- assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
- s"/${namespace}_1.5.0_USER_SPARK_SQL/$user/sub_1")
+ val expected5 = (s"/${namespace}_1.5.0_USER_SPARK_SQL/$user", Some("sub_1"))
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs5.command.conf,
+ scArgs5.command.normalizedCliConfig) === expected5)
}
test("test get zk namespace for different share level engines") {
@@ -445,8 +460,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--user",
user)
val scArgs1 = new ControlCliArguments(arg1)
- assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
+ val expected1 = (s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs1.command.conf,
+ scArgs1.command.normalizedCliConfig) === expected1)
val arg2 = Array(
"list",
@@ -460,8 +477,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"CONNECTION")
val scArgs2 = new ControlCliArguments(arg2)
- assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL/$user/default")
+ val expected2 = (s"/${namespace}_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs2.command.conf,
+ scArgs2.command.normalizedCliConfig) === expected2)
val arg3 = Array(
"list",
@@ -475,8 +494,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"USER")
val scArgs3 = new ControlCliArguments(arg3)
- assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
+ val expected3 = (s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs3.command.conf,
+ scArgs3.command.normalizedCliConfig) === expected3)
val arg4 = Array(
"list",
@@ -490,8 +511,10 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"GROUP")
val scArgs4 = new ControlCliArguments(arg4)
- assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_GROUP_SPARK_SQL/$user/default")
+ val expected4 = (s"/${namespace}_${KYUUBI_VERSION}_GROUP_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs4.command.conf,
+ scArgs4.command.normalizedCliConfig) === expected4)
val arg5 = Array(
"list",
@@ -505,7 +528,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"SERVER")
val scArgs5 = new ControlCliArguments(arg5)
- assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
- s"/${namespace}_${KYUUBI_VERSION}_SERVER_SPARK_SQL/$user/default")
+ val expected5 = (s"/${namespace}_${KYUUBI_VERSION}_SERVER_SPARK_SQL/$user", None)
+ assert(CtlUtils.getZkEngineNamespaceAndSubdomain(
+ scArgs5.command.conf,
+ scArgs5.command.normalizedCliConfig) === expected5)
}
}