You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/14 03:19:27 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new cb4833859 [KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation
cb4833859 is described below
commit cb48338591686cfd7b6f8ea47c2b309995fec728
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Tue Jun 14 11:19:18 2022 +0800
[KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation
### _Why are the changes needed?_
To close #2628
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2823 from lightning-L/kyuubi-2628.
Closes #2628
3af66099 [Tianlin Liao] refactor
19ca40fe [Tianlin Liao] fix list/log batch
d0fc0a9d [Tianlin Liao] refactor RestClientFactory
617732c8 [Tianlin Liao] refactor command
897afd38 [Tianlin Liao] rename ClientFactory to RestClientFactory
9ed61166 [Tianlin Liao] add list batch option
953e7169 [Tianlin Liao] refactor kyuubi-ctl conf
ad2a9b6f [Tianlin Liao] fix license and style
60943eb2 [Tianlin Liao] fix delete/list operation after rebase
97cbe9df [Tianlin Liao] add kyuubi rest config: hostUrl, spnegoHost, authSchema
c46e8f7a [Tianlin Liao] implement create/get/delete/list/log/submit batch command
71c9cb9a [Tianlin Liao] reorganize cmd folder and rename command class
cf7478bb [Tianlin Liao] use reflect to generate command object
e985c7bb [Tianlin Liao] rename ServiceControlXXX to ControlXXX
Authored-by: Tianlin Liao <ti...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
bin/kyuubi-ctl | 4 +-
docs/deployment/settings.md | 9 +
kyuubi-ctl/pom.xml | 28 ++
.../scala/org/apache/kyuubi/ctl/CliConfig.scala | 39 ++-
.../scala/org/apache/kyuubi/ctl/CommandLine.scala | 186 ++++++++++++-
.../scala/org/apache/kyuubi/ctl/ControlCli.scala | 5 +-
.../apache/kyuubi/ctl/ControlCliArguments.scala | 108 +++++---
.../main/scala/org/apache/kyuubi/ctl/CtlConf.scala | 48 ++++
.../org/apache/kyuubi/ctl/RestClientFactory.scala | 115 ++++++++
.../scala/org/apache/kyuubi/ctl/cmd/Command.scala | 120 +++------
.../org/apache/kyuubi/ctl/cmd/ListCommand.scala | 53 ----
.../kyuubi/ctl/cmd/create/CreateBatchCommand.scala | 54 ++++
.../CreateServerCommand.scala} | 34 +--
.../delete/DeleteBatchCommand.scala} | 31 ++-
.../ctl/cmd/{ => delete}/DeleteCommand.scala | 27 +-
.../delete/DeleteEngineCommand.scala} | 20 +-
.../delete/DeleteServerCommand.scala} | 18 +-
.../kyuubi/ctl/cmd/get/GetBatchCommand.scala | 43 +++
.../kyuubi/ctl/cmd/{ => get}/GetCommand.scala | 32 +--
.../get/GetEngineCommand.scala} | 20 +-
.../get/GetServerCommand.scala} | 18 +-
.../kyuubi/ctl/cmd/list/ListBatchCommand.scala | 58 ++++
.../{Render.scala => cmd/list/ListCommand.scala} | 28 +-
.../list/ListEngineCommand.scala} | 20 +-
.../list/ListServerCommand.scala} | 18 +-
.../kyuubi/ctl/cmd/log/LogBatchCommand.scala | 71 +++++
.../kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala | 76 ++++++
.../{Render.scala => util/CommandLineUtils.scala} | 33 ++-
.../org/apache/kyuubi/ctl/util/CtlUtils.scala | 110 ++++++++
.../ctl/{Render.scala => util/DateTimeUtils.scala} | 28 +-
.../scala/org/apache/kyuubi/ctl/util/Render.scala | 70 +++++
.../Tabulator.scala} | 24 +-
.../org/apache/kyuubi/ctl/util/Validator.scala | 80 ++++++
kyuubi-ctl/src/test/resources/cli/batch.yaml | 32 +++
.../apache/kyuubi/ctl/BatchCliArgumentsSuite.scala | 253 ++++++++++++++++++
.../kyuubi/ctl/ControlCliArgumentsSuite.scala | 177 +++++++------
.../org/apache/kyuubi/ctl/ControlCliSuite.scala | 110 +++-----
.../org/apache/kyuubi/ctl/TestPrematureExit.scala | 102 +++++++
.../org/apache/kyuubi/client/BatchRestApi.java | 4 +-
kyuubi-server/pom.xml | 15 ++
.../kyuubi/config/AllKyuubiConfiguration.scala | 4 +-
.../kyuubi/server/rest/client/BatchCliSuite.scala | 294 +++++++++++++++++++++
pom.xml | 1 +
43 files changed, 2065 insertions(+), 555 deletions(-)
diff --git a/bin/kyuubi-ctl b/bin/kyuubi-ctl
index 3ff03ddff..d989eeb2d 100755
--- a/bin/kyuubi-ctl
+++ b/bin/kyuubi-ctl
@@ -16,8 +16,8 @@
# limitations under the License.
#
-## Kyuubi Service Control Client Entrance
-CLASS="org.apache.kyuubi.ctl.ServiceControlCli"
+## Kyuubi Control Client Entrance
+CLASS="org.apache.kyuubi.ctl.ControlCli"
export KYUUBI_HOME="$(cd "$(dirname "$0")"/..; pwd)"
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 3650caa5d..54285fd86 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -183,6 +183,15 @@ kyuubi.credentials.renewal.retry.wait|PT1M|How long to wait before retrying to f
kyuubi.credentials.update.wait.timeout|PT1M|How long to wait until credentials are ready.|duration|1.5.0
+### Ctl
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+kyuubi.ctl.rest.auth.schema|basic|The authentication schema. Valid values are: basic, spnego.|string|1.6.0
+kyuubi.ctl.rest.base.url|<undefined>|The REST API base URL, which contains the scheme (http:// or https://), host name, port number|string|1.6.0
+kyuubi.ctl.rest.spnego.host|<undefined>|When auth schema is spnego, need to config spnego host.|string|1.6.0
+
+
### Delegation
Key | Default | Meaning | Type | Since
diff --git a/kyuubi-ctl/pom.xml b/kyuubi-ctl/pom.xml
index c1235c645..984f8661e 100644
--- a/kyuubi-ctl/pom.xml
+++ b/kyuubi-ctl/pom.xml
@@ -44,6 +44,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-rest-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
@@ -71,6 +77,12 @@
<artifactId>scopt_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>${snakeyaml.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -113,5 +125,21 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
index 1fc04b33b..19542aa6f 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
@@ -16,23 +16,26 @@
*/
package org.apache.kyuubi.ctl
-import org.apache.kyuubi.ctl.ServiceControlAction.ControlAction
-import org.apache.kyuubi.ctl.ServiceControlObject.ControlObject
+import org.apache.kyuubi.ctl.ControlAction.ControlAction
+import org.apache.kyuubi.ctl.ControlObject.ControlObject
-private[ctl] object ServiceControlAction extends Enumeration {
+private[ctl] object ControlAction extends Enumeration {
type ControlAction = Value
- val CREATE, GET, DELETE, LIST = Value
+ val CREATE, GET, DELETE, LIST, LOG, SUBMIT = Value
}
-private[ctl] object ServiceControlObject extends Enumeration {
+private[ctl] object ControlObject extends Enumeration {
type ControlObject = Value
- val SERVER, ENGINE = Value
+ val SERVER, ENGINE, BATCH = Value
}
case class CliConfig(
action: ControlAction = null,
- service: ControlObject = ServiceControlObject.SERVER,
+ resource: ControlObject = ControlObject.SERVER,
commonOpts: CommonOpts = CommonOpts(),
+ createOpts: CreateOpts = CreateOpts(),
+ logOpts: LogOpts = LogOpts(),
+ batchOpts: BatchOpts = BatchOpts(),
engineOpts: EngineOpts = EngineOpts())
case class CommonOpts(
@@ -41,7 +44,27 @@ case class CommonOpts(
host: String = null,
port: String = null,
version: String = null,
- verbose: Boolean = false)
+ verbose: Boolean = false,
+ hostUrl: String = null,
+ authSchema: String = null,
+ username: String = null,
+ password: String = null,
+ spnegoHost: String = null)
+
+case class CreateOpts(filename: String = null)
+
+case class LogOpts(forward: Boolean = false)
+
+case class BatchOpts(
+ batchId: String = null,
+ batchType: String = null,
+ batchUser: String = null,
+ batchState: String = null,
+ createTime: Long = 0,
+ endTime: Long = 0,
+ from: Int = -1,
+ size: Int = 100,
+ hs2ProxyUser: String = null)
case class EngineOpts(
user: String = null,
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
index 903c6cd48..cc3fb7a62 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.ctl
import scopt.{OParser, OParserBuilder}
import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
object CommandLine {
@@ -32,9 +33,14 @@ object CommandLine {
get(builder),
delete(builder),
list(builder),
+ log(builder),
+ submit(builder),
checkConfig(f => {
- if (f.action == null) failure("Must specify action command: [create|get|delete|list].")
- else success
+ if (f.action == null) {
+ failure("Must specify action command: [create|get|delete|list|log|submit].")
+ } else {
+ success
+ }
}),
note(""),
help('h', "help").text("Show help message and exit."))
@@ -62,7 +68,22 @@ object CommandLine {
" change it if the active service is running in another."),
opt[Unit]('b', "verbose")
.action((_, c) => c.copy(commonOpts = c.commonOpts.copy(verbose = true)))
- .text("Print additional debug output."))
+ .text("Print additional debug output."),
+ opt[String]("hostUrl")
+ .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(hostUrl = v)))
+ .text("Host url for rest api."),
+ opt[String]("authSchema")
+ .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(authSchema = v)))
+ .text("Auth schema for rest api, valid values are basic, spnego."),
+ opt[String]("username")
+ .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(username = v)))
+ .text("Username for basic authentication."),
+ opt[String]("password")
+ .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(password = v)))
+ .text("Password for basic authentication."),
+ opt[String]("spnegoHost")
+ .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(spnegoHost = v)))
+ .text("Spnego host for spnego authentication."))
}
private def create(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
@@ -70,8 +91,13 @@ object CommandLine {
OParser.sequence(
note(""),
cmd("create")
- .action((_, c) => c.copy(action = ServiceControlAction.CREATE))
+ .text("\tCreate a resource.")
+ .action((_, c) => c.copy(action = ControlAction.CREATE))
.children(
+ opt[String]('f', "filename")
+ .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+ .text("Filename to use to create the resource"),
+ batchCmd(builder).text("\tOpen batch session."),
serverCmd(builder).text("\tExpose Kyuubi server instance to another domain.")))
}
@@ -80,9 +106,10 @@ object CommandLine {
OParser.sequence(
note(""),
cmd("get")
- .text("\tGet the service/engine node info, host and port needed.")
- .action((_, c) => c.copy(action = ServiceControlAction.GET))
+ .text("\tDisplay information about the specified resources.")
+ .action((_, c) => c.copy(action = ControlAction.GET))
.children(
+ getBatchCmd(builder).text("\tGet batch by id."),
serverCmd(builder).text("\tGet Kyuubi server info of domain"),
engineCmd(builder).text("\tGet Kyuubi engine info belong to a user.")))
@@ -93,9 +120,10 @@ object CommandLine {
OParser.sequence(
note(""),
cmd("delete")
- .text("\tDelete the specified service/engine node, host and port needed.")
- .action((_, c) => c.copy(action = ServiceControlAction.DELETE))
+ .text("\tDelete resources.")
+ .action((_, c) => c.copy(action = ControlAction.DELETE))
.children(
+ deleteBatchCmd(builder).text("\tClose batch session."),
serverCmd(builder).text("\tDelete the specified service node for a domain"),
engineCmd(builder).text("\tDelete the specified engine node for user.")))
@@ -106,22 +134,51 @@ object CommandLine {
OParser.sequence(
note(""),
cmd("list")
- .text("\tList all the service/engine nodes for a particular domain.")
- .action((_, c) => c.copy(action = ServiceControlAction.LIST))
+ .text("\tList information about resources.")
+ .action((_, c) => c.copy(action = ControlAction.LIST))
.children(
+ listBatchCmd(builder).text("\tList batch session info."),
serverCmd(builder).text("\tList all the service nodes for a particular domain"),
engineCmd(builder).text("\tList all the engine nodes for a user")))
}
+ private def log(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ OParser.sequence(
+ note(""),
+ cmd("log")
+ .text("\tPrint the logs for specified resource.")
+ .action((_, c) => c.copy(action = ControlAction.LOG))
+ .children(
+ opt[Unit]("forward")
+ .action((_, c) => c.copy(logOpts = c.logOpts.copy(forward = true)))
+ .text("If forward is specified, the ctl will block forever."),
+ logBatchCmd(builder).text("\tGet batch session local log.")))
+ }
+
+ private def submit(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ OParser.sequence(
+ note(""),
+ cmd("submit")
+ .text("\tCombination of create, get and log commands.")
+ .action((_, c) => c.copy(action = ControlAction.SUBMIT))
+ .children(
+ opt[String]('f', "filename")
+ .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+ .text("Filename to use to create the resource"),
+ batchCmd(builder).text("\topen batch session and wait for completion.")))
+ }
+
private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
- cmd("server").action((_, c) => c.copy(service = ServiceControlObject.SERVER))
+ cmd("server").action((_, c) => c.copy(resource = ControlObject.SERVER))
}
private def engineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
- cmd("engine").action((_, c) => c.copy(service = ServiceControlObject.ENGINE))
+ cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
.children(
opt[String]('u', "user")
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(user = v)))
@@ -137,4 +194,109 @@ object CommandLine {
.text("The engine share level this engine belong to."))
}
+ private def batchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ }
+
+ private def getBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ .children(
+ arg[String]("<batchId>")
+ .optional()
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+ .text("Batch id."))
+ }
+
+ private def deleteBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ .children(
+ arg[String]("<batchId>")
+ .optional()
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+ .text("Batch id."),
+ opt[String]("hs2ProxyUser")
+ .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+ .text("The value of hive.server2.proxy.user config."))
+ }
+
+ private def listBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ .children(
+ opt[String]("batchType")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchType = v)))
+ .text("Batch type."),
+ opt[String]("batchUser")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchUser = v)))
+ .text("Batch user."),
+ opt[String]("batchState")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchState = v)))
+ .text("Batch state."),
+ opt[String]("createTime")
+ .action((v, c) =>
+ c.copy(batchOpts = c.batchOpts.copy(createTime =
+ dateStringToMillis(v, "yyyyMMddHHmmss"))))
+ .validate(x =>
+ if (x.matches("\\d{14}")) {
+ success
+ } else {
+ failure("Option --createTime must be in yyyyMMddHHmmss format.")
+ })
+ .text("Batch create time, should be in yyyyMMddHHmmss format."),
+ opt[String]("endTime")
+ .action((v, c) =>
+ c.copy(batchOpts = c.batchOpts.copy(endTime =
+ dateStringToMillis(v, "yyyyMMddHHmmss"))))
+ .validate(x =>
+ if (x.matches("\\d{14}")) {
+ success
+ } else {
+ failure("Option --endTime must be in yyyyMMddHHmmss format.")
+ })
+ .text("Batch end time, should be in yyyyMMddHHmmss format."),
+ opt[Int]("from")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(from = v)))
+ .validate(x =>
+ if (x >= 0) {
+ success
+ } else {
+ failure("Option --from must be >=0")
+ })
+ .text("Specify which record to start from retrieving info."),
+ opt[Int]("size")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(size = v)))
+ .validate(x =>
+ if (x >= 0) {
+ success
+ } else {
+ failure("Option --size must be >=0")
+ })
+ .text("The max number of records returned in the query."))
+ }
+
+ private def logBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ .children(
+ arg[String]("<batchId>")
+ .optional()
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+ .text("Batch id."),
+ opt[Int]("from")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(from = v)))
+ .text("Specify which record to start from retrieving info."),
+ opt[Int]("size")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(size = v)))
+ .validate(x =>
+ if (x >= 0) {
+ success
+ } else {
+ failure("Option --size must be >=0")
+ })
+ .text("The max number of records returned in the query."))
+ }
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
index ab87568b4..93b87c981 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.ctl
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.ctl.util.CommandLineUtils
/**
* Main gateway of launching a Kyuubi Ctl action.
@@ -32,11 +33,11 @@ private[kyuubi] class ControlCli extends Logging {
val ctlArgs = parseArguments(args)
// when parse failed, exit
- if (ctlArgs.cliArgs == null) {
+ if (ctlArgs.cliConfig == null) {
sys.exit(1)
}
- val verbose = ctlArgs.cliArgs.commonOpts.verbose
+ val verbose = ctlArgs.cliConfig.commonOpts.verbose
if (verbose) {
super.info(ctlArgs.toString)
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
index 29d4724d5..3d90b60b9 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
@@ -19,13 +19,19 @@ package org.apache.kyuubi.ctl
import scopt.OParser
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.ctl.cmd._
+import org.apache.kyuubi.ctl.cmd.create.{CreateBatchCommand, CreateServerCommand}
+import org.apache.kyuubi.ctl.cmd.delete.{DeleteBatchCommand, DeleteEngineCommand, DeleteServerCommand}
+import org.apache.kyuubi.ctl.cmd.get.{GetBatchCommand, GetEngineCommand, GetServerCommand}
+import org.apache.kyuubi.ctl.cmd.list.{ListBatchCommand, ListEngineCommand, ListServerCommand}
+import org.apache.kyuubi.ctl.cmd.log.LogBatchCommand
+import org.apache.kyuubi.ctl.cmd.submit.SubmitBatchCommand
class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends ControlCliArgumentsParser with Logging {
- var cliArgs: CliConfig = null
+ var cliConfig: CliConfig = null
var command: Command = null
@@ -48,48 +54,88 @@ class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
result match {
case Some(arguments) =>
command = getCommand(arguments)
- command.preProcess()
- cliArgs = command.cliArgs
+ command.validate()
+ cliConfig = command.normalizedCliConfig
case _ =>
// arguments are bad, exit
}
}
}
- private def getCommand(cliArgs: CliConfig): Command = {
- cliArgs.action match {
- case ServiceControlAction.CREATE => new CreateCommand(cliArgs)
- case ServiceControlAction.GET => new GetCommand(cliArgs)
- case ServiceControlAction.DELETE => new DeleteCommand(cliArgs)
- case ServiceControlAction.LIST => new ListCommand(cliArgs)
- case _ => null
+ private def getCommand(cliConfig: CliConfig): Command = {
+ cliConfig.action match {
+ case ControlAction.CREATE => cliConfig.resource match {
+ case ControlObject.BATCH => new CreateBatchCommand(cliConfig)
+ case ControlObject.SERVER => new CreateServerCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case ControlAction.GET => cliConfig.resource match {
+ case ControlObject.BATCH => new GetBatchCommand(cliConfig)
+ case ControlObject.ENGINE => new GetEngineCommand(cliConfig)
+ case ControlObject.SERVER => new GetServerCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case ControlAction.DELETE => cliConfig.resource match {
+ case ControlObject.BATCH => new DeleteBatchCommand(cliConfig)
+ case ControlObject.ENGINE => new DeleteEngineCommand(cliConfig)
+ case ControlObject.SERVER => new DeleteServerCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case ControlAction.LIST => cliConfig.resource match {
+ case ControlObject.BATCH => new ListBatchCommand(cliConfig)
+ case ControlObject.ENGINE => new ListEngineCommand(cliConfig)
+ case ControlObject.SERVER => new ListServerCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case ControlAction.LOG => cliConfig.resource match {
+ case ControlObject.BATCH => new LogBatchCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case ControlAction.SUBMIT => cliConfig.resource match {
+ case ControlObject.BATCH => new SubmitBatchCommand(cliConfig)
+ case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+ }
+ case _ => throw new KyuubiException(s"Invalid operation: ${cliConfig.action}")
}
}
override def toString: String = {
- cliArgs.service match {
- case ServiceControlObject.SERVER =>
+ cliConfig.resource match {
+ case ControlObject.BATCH =>
+ s"""Parsed arguments:
+ | action ${cliConfig.action}
+ | resource ${cliConfig.resource}
+ | batchId ${cliConfig.batchOpts.batchId}
+ | batchType ${cliConfig.batchOpts.batchType}
+ | batchUser ${cliConfig.batchOpts.batchUser}
+ | batchState ${cliConfig.batchOpts.batchState}
+ | createTime ${cliConfig.batchOpts.createTime}
+ | endTime ${cliConfig.batchOpts.endTime}
+ | from ${cliConfig.batchOpts.from}
+ | size ${cliConfig.batchOpts.size}
+ """.stripMargin
+ case ControlObject.SERVER =>
s"""Parsed arguments:
- | action ${cliArgs.action}
- | service ${cliArgs.service}
- | zkQuorum ${cliArgs.commonOpts.zkQuorum}
- | namespace ${cliArgs.commonOpts.namespace}
- | host ${cliArgs.commonOpts.host}
- | port ${cliArgs.commonOpts.port}
- | version ${cliArgs.commonOpts.version}
- | verbose ${cliArgs.commonOpts.verbose}
+ | action ${cliConfig.action}
+ | resource ${cliConfig.resource}
+ | zkQuorum ${cliConfig.commonOpts.zkQuorum}
+ | namespace ${cliConfig.commonOpts.namespace}
+ | host ${cliConfig.commonOpts.host}
+ | port ${cliConfig.commonOpts.port}
+ | version ${cliConfig.commonOpts.version}
+ | verbose ${cliConfig.commonOpts.verbose}
""".stripMargin
- case ServiceControlObject.ENGINE =>
+ case ControlObject.ENGINE =>
s"""Parsed arguments:
- | action ${cliArgs.action}
- | service ${cliArgs.service}
- | zkQuorum ${cliArgs.commonOpts.zkQuorum}
- | namespace ${cliArgs.commonOpts.namespace}
- | user ${cliArgs.engineOpts.user}
- | host ${cliArgs.commonOpts.host}
- | port ${cliArgs.commonOpts.port}
- | version ${cliArgs.commonOpts.version}
- | verbose ${cliArgs.commonOpts.verbose}
+ | action ${cliConfig.action}
+ | resource ${cliConfig.resource}
+ | zkQuorum ${cliConfig.commonOpts.zkQuorum}
+ | namespace ${cliConfig.commonOpts.namespace}
+ | user ${cliConfig.engineOpts.user}
+ | host ${cliConfig.commonOpts.host}
+ | port ${cliConfig.commonOpts.port}
+ | version ${cliConfig.commonOpts.version}
+ | verbose ${cliConfig.commonOpts.verbose}
""".stripMargin
case _ => ""
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
new file mode 100644
index 000000000..1bde0e6cf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ctl
+
+import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
+
+object CtlConf {
+
+ private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key)
+
+ val CTL_REST_CLIENT_BASE_URL: OptionalConfigEntry[String] =
+ buildConf("kyuubi.ctl.rest.base.url")
+ .doc("The REST API base URL, " +
+ "which contains the scheme (http:// or https://), host name, port number")
+ .version("1.6.0")
+ .stringConf
+ .createOptional
+
+ val CTL_REST_CLIENT_AUTH_SCHEMA: ConfigEntry[String] =
+ buildConf("kyuubi.ctl.rest.auth.schema")
+ .doc("The authentication schema. Valid values are: basic, spnego.")
+ .version("1.6.0")
+ .stringConf
+ .createWithDefault("basic")
+
+ val CTL_REST_CLIENT_SPNEGO_HOST: OptionalConfigEntry[String] =
+ buildConf("kyuubi.ctl.rest.spnego.host")
+ .doc("When auth schema is spnego, need to config spnego host.")
+ .version("1.6.0")
+ .stringConf
+ .createOptional
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
new file mode 100644
index 000000000..3fc98d22a
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl
+
+import java.util.{Map => JMap}
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.client.KyuubiRestClient
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ctl.CtlConf._
+
+object RestClientFactory {
+
+ private[ctl] def withKyuubiRestClient(
+ cliConfig: CliConfig,
+ map: JMap[String, Object],
+ conf: KyuubiConf)(f: KyuubiRestClient => Unit): Unit = {
+ val kyuubiRestClient: KyuubiRestClient =
+ RestClientFactory.getKyuubiRestClient(cliConfig, map, conf)
+ try {
+ f(kyuubiRestClient)
+ } finally {
+ kyuubiRestClient.close()
+ }
+ }
+
+ private def getKyuubiRestClient(
+ cliConfig: CliConfig,
+ map: JMap[String, Object],
+ conf: KyuubiConf): KyuubiRestClient = {
+ val version = getApiVersion(map)
+ val hostUrl = getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).get, cliConfig, map)
+ val authSchema =
+ getRestConfig("authSchema", conf.get(CTL_REST_CLIENT_AUTH_SCHEMA), cliConfig, map)
+
+ var kyuubiRestClient: KyuubiRestClient = null
+ authSchema.toLowerCase match {
+ case "basic" =>
+ val username = getRestConfig("username", null, cliConfig, map)
+ val password = cliConfig.commonOpts.password
+ kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
+ .apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
+ .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+ .username(username)
+ .password(password)
+ .build()
+ case "spnego" =>
+ val spnegoHost =
+ getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).get, cliConfig, map)
+ kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
+ .apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
+ .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.SPNEGO)
+ .spnegoHost(spnegoHost)
+ .build()
+ case _ => throw new KyuubiException(s"Unsupported auth schema: $authSchema")
+ }
+ kyuubiRestClient
+ }
+
+ private def getApiVersion(map: JMap[String, Object]): String = {
+ var version: String = "V1"
+ if (map != null) {
+ val configuredVersion = map.get("apiVersion").asInstanceOf[String].toUpperCase
+ if (StringUtils.isNotBlank(configuredVersion)) {
+ version = configuredVersion
+ }
+ }
+ version
+ }
+
+ private def getRestConfig(
+ key: String,
+ defaultValue: String,
+ cliConfig: CliConfig,
+ map: JMap[String, Object]): String = {
+ // get value from command line
+ val commonOpts = cliConfig.commonOpts
+ var configValue: String = key match {
+ case "hostUrl" => commonOpts.hostUrl
+ case "authSchema" => commonOpts.authSchema
+ case "username" => commonOpts.username
+ case "spnegoHost" => commonOpts.spnegoHost
+ case _ => null
+ }
+
+ // get value from map
+ if (StringUtils.isBlank(configValue) && map != null) {
+ configValue = map.get(key).asInstanceOf[String]
+ }
+
+ // get value from default
+ if (StringUtils.isBlank(configValue)) {
+ configValue = defaultValue
+ }
+
+ configValue
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
index 0850ff400..f0d0ca15a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
@@ -16,82 +16,43 @@
*/
package org.apache.kyuubi.ctl.cmd
-import java.net.InetAddress
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+import java.util.HashMap
+
+import org.yaml.snakeyaml.Yaml
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUBDOMAIN, ENGINE_TYPE}
-import org.apache.kyuubi.ctl.{CliConfig, ServiceControlObject}
+import org.apache.kyuubi.ctl.CliConfig
import org.apache.kyuubi.ctl.ControlCli.printMessage
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
-abstract class Command(var cliArgs: CliConfig) extends Logging {
+abstract class Command(cliConfig: CliConfig) extends Logging {
+
+ protected val DEFAULT_LOG_QUERY_INTERVAL: Int = 1000
val conf = KyuubiConf().loadFileDefaults()
- val verbose = cliArgs.commonOpts.verbose
+ val verbose = cliConfig.commonOpts.verbose
- def preProcess(): Unit = {
- this.cliArgs = useDefaultPropertyValueIfMissing()
- validateArguments()
- }
+ val normalizedCliConfig: CliConfig = useDefaultPropertyValueIfMissing()
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
- def validateArguments(): Unit
+ def validate(): Unit
def run(): Unit
def fail(msg: String): Unit = throw new KyuubiException(msg)
- protected def validateZkArguments(): Unit = {
- if (cliArgs.commonOpts.zkQuorum == null) {
- fail("Zookeeper quorum is not specified and no default value to load")
- }
- if (cliArgs.commonOpts.namespace == null) {
- fail("Zookeeper namespace is not specified and no default value to load")
- }
- }
-
- protected def validateHostAndPort(): Unit = {
- if (cliArgs.commonOpts.host == null) {
- fail("Must specify host for service")
- }
- if (cliArgs.commonOpts.port == null) {
- fail("Must specify port for service")
- }
-
- try {
- InetAddress.getByName(cliArgs.commonOpts.host)
- } catch {
- case _: Exception =>
- fail(s"Unknown host: ${cliArgs.commonOpts.host}")
- }
-
- try {
- if (cliArgs.commonOpts.port.toInt <= 0) {
- fail(s"Specified port should be a positive number")
- }
- } catch {
- case _: NumberFormatException =>
- fail(s"Specified port is not a valid integer number: ${cliArgs.commonOpts.port}")
- }
- }
-
- protected def validateUser(): Unit = {
- if (cliArgs.service == ServiceControlObject.ENGINE && cliArgs.engineOpts.user == null) {
- fail("Must specify user name for engine, please use -u or --user.")
- }
- }
-
protected def mergeArgsIntoKyuubiConf(): Unit = {
- conf.set(HA_ADDRESSES.key, cliArgs.commonOpts.zkQuorum)
- conf.set(HA_NAMESPACE.key, cliArgs.commonOpts.namespace)
+ conf.set(HA_ADDRESSES.key, normalizedCliConfig.commonOpts.zkQuorum)
+ conf.set(HA_NAMESPACE.key, normalizedCliConfig.commonOpts.namespace)
}
private def useDefaultPropertyValueIfMissing(): CliConfig = {
- var arguments: CliConfig = cliArgs.copy()
- if (cliArgs.commonOpts.zkQuorum == null) {
+ var arguments: CliConfig = cliConfig.copy()
+ if (cliConfig.commonOpts.zkQuorum == null) {
conf.getOption(HA_ADDRESSES.key).foreach { v =>
if (verbose) {
super.info(s"Zookeeper quorum is not specified, use value from default conf:$v")
@@ -118,41 +79,24 @@ abstract class Command(var cliArgs: CliConfig) extends Logging {
arguments
}
- private[ctl] def getZkNamespace(): String = {
- cliArgs.service match {
- case ServiceControlObject.SERVER =>
- DiscoveryPaths.makePath(null, cliArgs.commonOpts.namespace)
- case ServiceControlObject.ENGINE =>
- val engineType = Some(cliArgs.engineOpts.engineType)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_TYPE))
- val engineSubdomain = Some(cliArgs.engineOpts.engineSubdomain)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
- val engineShareLevel = Some(cliArgs.engineOpts.engineShareLevel)
- .filter(_ != null).filter(_.nonEmpty)
- .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
- // The path of the engine defined in zookeeper comes from
- // org.apache.kyuubi.engine.EngineRef#engineSpace
- DiscoveryPaths.makePath(
- s"${cliArgs.commonOpts.namespace}_${cliArgs.commonOpts.version}_" +
- s"${engineShareLevel}_${engineType}",
- cliArgs.engineOpts.user,
- Array(engineSubdomain))
- }
- }
+ private[ctl] def readConfig(): HashMap[String, Object] = {
+ var filename = normalizedCliConfig.createOpts.filename
- private[ctl] def getServiceNodes(
- discoveryClient: DiscoveryClient,
- znodeRoot: String,
- hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
- val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
- hostPortOpt match {
- case Some((host, port)) => serviceNodes.filter { sn =>
- sn.host == host && sn.port == port
- }
- case _ => serviceNodes
+ var map: HashMap[String, Object] = null
+ var br: BufferedReader = null
+ try {
+ val yaml = new Yaml()
+ val input = new FileInputStream(new File(filename))
+ br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
+ map = yaml.load(br).asInstanceOf[HashMap[String, Object]]
+ } catch {
+ case e: Exception => fail(s"Failed to read yaml file[$filename]: $e")
+ } finally {
+ if (br != null) {
+ br.close()
+ }
}
+ map
}
override def info(msg: => Any): Unit = printMessage(msg)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala
deleted file mode 100644
index 6c539dc22..000000000
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kyuubi.ctl.cmd
-
-import org.apache.kyuubi.ctl.{CliConfig, Render, ServiceControlObject}
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-
-class ListCommand(cliConfig: CliConfig) extends Command(cliConfig) {
-
- override def validateArguments(): Unit = {
- validateZkArguments()
- cliArgs.service match {
- case ServiceControlObject.ENGINE => validateUser()
- case _ =>
- }
- mergeArgsIntoKyuubiConf()
- }
-
- override def run(): Unit = {
- list(filterHostPort = false)
- }
-
- /**
- * List Kyuubi server nodes info.
- */
- private def list(filterHostPort: Boolean): Unit = {
- withDiscoveryClient(conf) { discoveryClient =>
- val znodeRoot = getZkNamespace()
- val hostPortOpt =
- if (filterHostPort) {
- Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
- } else None
- val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-
- val title = "Zookeeper service nodes"
- info(Render.renderServiceNodesInfo(title, nodes, verbose))
- }
- }
-}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
new file mode 100644
index 000000000..2313cce8d
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.create
+
+import java.util.{ArrayList, HashMap}
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
+
+class CreateBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+ def validate(): Unit = {
+ Validator.validateFilename(normalizedCliConfig)
+ }
+
+ def run(): Unit = {
+ val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
+
+ withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+ val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+ val batchRequest = new BatchRequest(
+ map.get("batchType").asInstanceOf[String],
+ request.get("resource").asInstanceOf[String],
+ request.get("className").asInstanceOf[String],
+ request.get("name").asInstanceOf[String],
+ request.get("configs").asInstanceOf[HashMap[String, String]],
+ request.get("args").asInstanceOf[ArrayList[String]])
+
+ val batch: Batch = batchRestApi.createBatch(batchRequest)
+ info(Render.renderBatchInfo(batch))
+ }
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
similarity index 75%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
index 63bf7d092..1dbd1ed46 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
@@ -14,26 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.create
import scala.collection.mutable.ListBuffer
-import org.apache.kyuubi.ctl.{CliConfig, Render, ServiceControlObject}
+import org.apache.kyuubi.ctl.{CliConfig, ControlObject}
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+class CreateServerCommand(cliConfig: CliConfig) extends Command(cliConfig) {
- def validateArguments(): Unit = {
- if (cliArgs.service != ServiceControlObject.SERVER) {
+ def validate(): Unit = {
+ if (normalizedCliConfig.resource != ControlObject.SERVER) {
fail("Only support expose Kyuubi server instance to another domain")
}
- validateZkArguments()
+
+ Validator.validateZkArguments(normalizedCliConfig)
val defaultNamespace = conf.getOption(HA_NAMESPACE.key)
.getOrElse(HA_NAMESPACE.defaultValStr)
- if (defaultNamespace.equals(cliArgs.commonOpts.namespace)) {
+ if (defaultNamespace.equals(normalizedCliConfig.commonOpts.namespace)) {
fail(
s"""
|Only support expose Kyuubi server instance to another domain, a different namespace
@@ -43,21 +46,17 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
}
- def run(): Unit = {
- create()
- }
-
/**
* Expose Kyuubi server instance to another domain.
*/
- private def create(): Unit = {
+ def run(): Unit = {
val kyuubiConf = conf
- kyuubiConf.setIfMissing(HA_ADDRESSES, cliArgs.commonOpts.zkQuorum)
+ kyuubiConf.setIfMissing(HA_ADDRESSES, normalizedCliConfig.commonOpts.zkQuorum)
withDiscoveryClient(kyuubiConf) { discoveryClient =>
val fromNamespace =
DiscoveryPaths.makePath(null, kyuubiConf.get(HA_NAMESPACE))
- val toNamespace = getZkNamespace()
+ val toNamespace = CtlUtils.getZkNamespace(kyuubiConf, normalizedCliConfig)
val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()
@@ -69,7 +68,7 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
s" from $fromNamespace to $toNamespace")
val newNodePath = zc.createAndGetServiceNode(
kyuubiConf,
- cliArgs.commonOpts.namespace,
+ normalizedCliConfig.commonOpts.namespace,
sn.instance,
sn.version,
true)
@@ -79,10 +78,10 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
}
}
- if (kyuubiConf.get(HA_ADDRESSES) == cliArgs.commonOpts.zkQuorum) {
+ if (kyuubiConf.get(HA_ADDRESSES) == normalizedCliConfig.commonOpts.zkQuorum) {
doCreate(discoveryClient)
} else {
- kyuubiConf.set(HA_ADDRESSES, cliArgs.commonOpts.zkQuorum)
+ kyuubiConf.set(HA_ADDRESSES, normalizedCliConfig.commonOpts.zkQuorum)
withDiscoveryClient(kyuubiConf)(doCreate)
}
}
@@ -91,4 +90,5 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
info(Render.renderServiceNodesInfo(title, exposedServiceNodes, verbose))
}
}
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
similarity index 51%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
index 39dd618d3..e871d311d 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
@@ -14,20 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.util.JsonUtil
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
-object Render {
+class DeleteBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+ var result: String = null
+
+ def validate(): Unit = {}
+
+ def run(): Unit = {
+ withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+ val result = batchRestApi.deleteBatch(
+ normalizedCliConfig.batchOpts.batchId,
+ normalizedCliConfig.batchOpts.hs2ProxyUser)
+ info(JsonUtil.toJson(result))
}
- Tabulator.format(title, header, rows, verbose)
}
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
similarity index 73%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
index d76f65bc3..e4a9bbbfe 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
@@ -14,35 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.delete
import scala.collection.mutable.ListBuffer
-import org.apache.kyuubi.ctl.{CliConfig, Render}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.ServiceNodeInfo
class DeleteCommand(cliConfig: CliConfig) extends Command(cliConfig) {
- def validateArguments(): Unit = {
- validateZkArguments()
- validateHostAndPort()
- validateUser()
+ def validate(): Unit = {
+ Validator.validateZkArguments(normalizedCliConfig)
+ Validator.validateHostAndPort(normalizedCliConfig)
mergeArgsIntoKyuubiConf()
}
- override def run(): Unit = {
- delete()
- }
-
/**
* Delete zookeeper service node with specified host port.
*/
- private def delete(): Unit = {
+ def run(): Unit = {
withDiscoveryClient(conf) { discoveryClient =>
- val znodeRoot = getZkNamespace()
- val hostPortOpt = Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
- val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
+ val znodeRoot = CtlUtils.getZkNamespace(conf, normalizedCliConfig)
+ val hostPortOpt =
+ Some((normalizedCliConfig.commonOpts.host, normalizedCliConfig.commonOpts.port.toInt))
+ val nodesToDelete = CtlUtils.getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
val deletedNodes = ListBuffer[ServiceNodeInfo]()
nodesToDelete.foreach { node =>
@@ -61,4 +59,5 @@ class DeleteCommand(cliConfig: CliConfig) extends Command(cliConfig) {
info(Render.renderServiceNodesInfo(title, deletedNodes, verbose))
}
}
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
index 39dd618d3..d0b0067a1 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
@@ -14,20 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
+class DeleteEngineCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+ override def validate(): Unit = {
+ super.validate()
+
+ // validate user
+ if (normalizedCliConfig.engineOpts.user == null) {
+ fail("Must specify user name for engine, please use -u or --user.")
}
- Tabulator.format(title, header, rows, verbose)
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
index 39dd618d3..fea1014a2 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
@@ -14,20 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
-
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
- }
-}
+class DeleteServerCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala
new file mode 100644
index 000000000..590f06463
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.get
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.Batch
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.Render
+
+class GetBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+ def validate(): Unit = {
+ if (normalizedCliConfig.batchOpts.batchId == null) {
+ fail("Must specify batchId for get batch command.")
+ }
+ }
+
+ def run(): Unit = {
+ withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+ val batch: Batch = batchRestApi.getBatchById(normalizedCliConfig.batchOpts.batchId)
+ info(Render.renderBatchInfo(batch))
+ }
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
similarity index 52%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
index 745495504..49b2ca442 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
@@ -14,34 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.get
-import org.apache.kyuubi.ctl.{CliConfig, Render}
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
class GetCommand(cliConfig: CliConfig) extends Command(cliConfig) {
- override def validateArguments(): Unit = {
- validateZkArguments()
- validateHostAndPort()
- validateUser()
+ def validate(): Unit = {
+ Validator.validateZkArguments(normalizedCliConfig)
+ Validator.validateHostAndPort(normalizedCliConfig)
mergeArgsIntoKyuubiConf()
}
- override def run(): Unit = list(filterHostPort = true)
-
- private def list(filterHostPort: Boolean): Unit = {
- withDiscoveryClient(conf) { discoveryClient =>
- val znodeRoot = getZkNamespace()
- val hostPortOpt =
- if (filterHostPort) {
- Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
- } else None
- val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-
- val title = "Zookeeper service nodes"
- info(Render.renderServiceNodesInfo(title, nodes, verbose))
- }
+ def run(): Unit = {
+ val nodes = CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = true)
+ val title = "Zookeeper service nodes"
+ info(Render.renderServiceNodesInfo(title, nodes, verbose))
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
index 39dd618d3..17557ceb6 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
@@ -14,20 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.get
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
+class GetEngineCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+ override def validate(): Unit = {
+ super.validate()
+
+ // validate user
+ if (normalizedCliConfig.engineOpts.user == null) {
+ fail("Must specify user name for engine, please use -u or --user.")
}
- Tabulator.format(title, header, rows, verbose)
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
index 39dd618d3..fd0f52bd9 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
@@ -14,20 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.get
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
-
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
- }
-}
+class GetServerCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
new file mode 100644
index 000000000..aa4608dbf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.list
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.GetBatchesResponse
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.Render
+
+class ListBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+ def validate(): Unit = {
+ if (normalizedCliConfig.batchOpts.createTime < 0) {
+ fail(s"Invalid createTime, negative milliseconds are not supported.")
+ }
+ if (normalizedCliConfig.batchOpts.endTime < 0) {
+ fail(s"Invalid endTime, negative milliseconds are not supported.")
+ }
+ if (normalizedCliConfig.batchOpts.endTime != 0
+ && normalizedCliConfig.batchOpts.createTime > normalizedCliConfig.batchOpts.endTime) {
+ fail(s"Invalid createTime/endTime, createTime should be less or equal to endTime.")
+ }
+ }
+
+ def run(): Unit = {
+ withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+ val batchOpts = normalizedCliConfig.batchOpts
+ val batchListInfo: GetBatchesResponse = batchRestApi.listBatches(
+ batchOpts.batchType,
+ batchOpts.batchUser,
+ batchOpts.batchState,
+ batchOpts.createTime,
+ batchOpts.endTime,
+ if (batchOpts.from < 0) 0 else batchOpts.from,
+ batchOpts.size)
+
+ info(Render.renderBatchListInfo(batchListInfo))
+ }
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
similarity index 57%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
index 39dd618d3..ba44f7b51 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
@@ -14,20 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
-object Render {
+class ListCommand(cliConfig: CliConfig) extends Command(cliConfig) {
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
+ def validate(): Unit = {
+ Validator.validateZkArguments(normalizedCliConfig)
+ mergeArgsIntoKyuubiConf()
}
+
+ def run(): Unit = {
+ val nodes = CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = false)
+
+ val title = "Zookeeper service nodes"
+ info(Render.renderServiceNodesInfo(title, nodes, verbose))
+ }
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
index 39dd618d3..1cad9ac23 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
@@ -14,20 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
+class ListEngineCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+ override def validate(): Unit = {
+ super.validate()
+
+ // validate user
+ if (normalizedCliConfig.engineOpts.user == null) {
+ fail("Must specify user name for engine, please use -u or --user.")
}
- Tabulator.format(title, header, rows, verbose)
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
index 39dd618d3..cea56e70c 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
@@ -14,20 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
-object Render {
-
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
- }
-}
+class ListServerCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
new file mode 100644
index 000000000..40ad5ec8c
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.log
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+
+class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+ def validate(): Unit = {
+ if (normalizedCliConfig.batchOpts.batchId == null) {
+ fail("Must specify batchId for log batch command.")
+ }
+ }
+
+ def run(): Unit = {
+ withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+ val batchId = normalizedCliConfig.batchOpts.batchId
+ var from = normalizedCliConfig.batchOpts.from
+ val size = normalizedCliConfig.batchOpts.size
+ var log: OperationLog = batchRestApi.getBatchLocalLog(
+ batchId,
+ from,
+ size)
+ log.getLogRowSet.asScala.foreach(x => info(x))
+
+ var done = false
+ var batch: Batch = null
+ from = if (from < 0) log.getLogRowSet.size else from + log.getLogRowSet.size
+ if (normalizedCliConfig.logOpts.forward) {
+ while (!done) {
+ log = batchRestApi.getBatchLocalLog(
+ batchId,
+ from,
+ size)
+ from += log.getLogRowSet.size
+ log.getLogRowSet.asScala.foreach(x => info(x))
+
+ Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+
+ batch = batchRestApi.getBatchById(batchId)
+ if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
+ && batch.getState() != "RUNNING") {
+ done = true
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
new file mode 100644
index 000000000..956aaf5d1
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.submit
+
+import java.util.{ArrayList, HashMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest, OperationLog}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Validator}
+
+class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+ def validate(): Unit = {
+ Validator.validateFilename(normalizedCliConfig)
+ }
+
+ def run(): Unit = {
+ val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
+
+ withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
+ val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+ val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+ val batchRequest = new BatchRequest(
+ map.get("batchType").asInstanceOf[String],
+ request.get("resource").asInstanceOf[String],
+ request.get("className").asInstanceOf[String],
+ request.get("name").asInstanceOf[String],
+ request.get("configs").asInstanceOf[HashMap[String, String]],
+ request.get("args").asInstanceOf[ArrayList[String]])
+
+ var batch: Batch = batchRestApi.createBatch(batchRequest)
+ val batchId = batch.getId
+ var log: OperationLog = null
+ var done = false
+ var from = 0
+ val size = normalizedCliConfig.batchOpts.size
+ while (!done) {
+ log = batchRestApi.getBatchLocalLog(
+ batchId,
+ from,
+ size)
+ from += log.getLogRowSet.size
+ log.getLogRowSet.asScala.foreach(x => info(x))
+
+ Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+
+ batch = batchRestApi.getBatchById(batchId)
+ if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
+ && batch.getState() != "RUNNING") {
+ done = true
+ }
+ }
+ }
+ }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
similarity index 55%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
index 39dd618d3..7d2a29b1e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
@@ -14,20 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+package org.apache.kyuubi.ctl.util
-object Render {
+import java.io.PrintStream
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
- }
+/**
+ * Contains basic command line parsing functionality and methods to parse some common Kyuubi Ctl
+ * options.
+ */
+private[kyuubi] trait CommandLineUtils extends CommandLineLoggingUtils {
+
+ def main(args: Array[String]): Unit
+}
+
+private[kyuubi] trait CommandLineLoggingUtils {
+ // Exposed for testing
+ private[kyuubi] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
+
+ private[kyuubi] var printStream: PrintStream = System.err
+
+ // scalastyle:off println
+ private[kyuubi] def printMessage(msg: Any): Unit = printStream.println(msg)
+ // scalastyle:on println
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
new file mode 100644
index 000000000..c957111d0
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+import java.util.{Map => JMap}
+
+import org.yaml.snakeyaml.Yaml
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUBDOMAIN, ENGINE_TYPE}
+import org.apache.kyuubi.ctl.{CliConfig, ControlObject}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+
+object CtlUtils {
+
+ private[ctl] def getZkNamespace(conf: KyuubiConf, cliConfig: CliConfig): String = {
+ cliConfig.resource match {
+ case ControlObject.SERVER =>
+ DiscoveryPaths.makePath(null, cliConfig.commonOpts.namespace)
+ case ControlObject.ENGINE =>
+ val engineType = Some(cliConfig.engineOpts.engineType)
+ .filter(_ != null).filter(_.nonEmpty)
+ .getOrElse(conf.get(ENGINE_TYPE))
+ val engineSubdomain = Some(cliConfig.engineOpts.engineSubdomain)
+ .filter(_ != null).filter(_.nonEmpty)
+ .getOrElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
+ val engineShareLevel = Some(cliConfig.engineOpts.engineShareLevel)
+ .filter(_ != null).filter(_.nonEmpty)
+ .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
+ // The path of the engine defined in zookeeper comes from
+ // org.apache.kyuubi.engine.EngineRef#engineSpace
+ DiscoveryPaths.makePath(
+ s"${cliConfig.commonOpts.namespace}_" +
+ s"${cliConfig.commonOpts.version}_" +
+ s"${engineShareLevel}_${engineType}",
+ cliConfig.engineOpts.user,
+ Array(engineSubdomain))
+ }
+ }
+
+ private[ctl] def getServiceNodes(
+ discoveryClient: DiscoveryClient,
+ znodeRoot: String,
+ hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
+ val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
+ hostPortOpt match {
+ case Some((host, port)) => serviceNodes.filter { sn =>
+ sn.host == host && sn.port == port
+ }
+ case _ => serviceNodes
+ }
+ }
+
+ /**
+ * List Kyuubi server nodes info.
+ */
+ private[ctl] def listZkServerNodes(
+ conf: KyuubiConf,
+ cliConfig: CliConfig,
+ filterHostPort: Boolean): Seq[ServiceNodeInfo] = {
+ var nodes = Seq.empty[ServiceNodeInfo]
+ withDiscoveryClient(conf) { discoveryClient =>
+ val znodeRoot = getZkNamespace(conf, cliConfig)
+ val hostPortOpt =
+ if (filterHostPort) {
+ Some((cliConfig.commonOpts.host, cliConfig.commonOpts.port.toInt))
+ } else None
+ nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
+ }
+ nodes
+ }
+
+ private[ctl] def loadYamlAsMap(cliConfig: CliConfig): JMap[String, Object] = {
+ val filename = cliConfig.createOpts.filename
+
+ var map: JMap[String, Object] = null
+ var br: BufferedReader = null
+ try {
+ val yaml = new Yaml()
+ val input = new FileInputStream(new File(filename))
+ br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
+ map = yaml.load(br).asInstanceOf[JMap[String, Object]]
+ } catch {
+ case e: Exception => throw new KyuubiException(s"Failed to read yaml file[$filename]: $e")
+ } finally {
+ if (br != null) {
+ br.close()
+ }
+ }
+ map
+ }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
similarity index 54%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
index 39dd618d3..67ac76333 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
@@ -14,20 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+package org.apache.kyuubi.ctl.util
-object Render {
+import java.time.{Instant, LocalDateTime, ZoneId}
+import java.time.format.DateTimeFormatter
- private[ctl] def renderServiceNodesInfo(
- title: String,
- serviceNodeInfo: Seq[ServiceNodeInfo],
- verbose: Boolean): String = {
- val header = Seq("Namespace", "Host", "Port", "Version")
- val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
- Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
- }
- Tabulator.format(title, header, rows, verbose)
+private[ctl] object DateTimeUtils {
+
+ def dateStringToMillis(date: String, format: String): Long = {
+ if (date == null) return 0
+ val localDateTime = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format))
+ localDateTime.atZone(ZoneId.systemDefault).toInstant.toEpochMilli
+ }
+
+ def millisToDateString(millis: Long, format: String): String = {
+ val formatter = DateTimeFormatter.ofPattern(format)
+ val date: LocalDateTime =
+ Instant.ofEpochMilli(millis).atZone(ZoneId.systemDefault()).toLocalDateTime()
+ formatter.format(date)
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala
new file mode 100644
index 000000000..210047fcf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.api.v1.dto.{Batch, GetBatchesResponse}
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+
+private[ctl] object Render {
+
+ def renderServiceNodesInfo(
+ title: String,
+ serviceNodeInfo: Seq[ServiceNodeInfo],
+ verbose: Boolean): String = {
+ val header = Seq("Namespace", "Host", "Port", "Version")
+ val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
+ Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+ }
+ Tabulator.format(title, header, rows, verbose)
+ }
+
+ def renderBatchListInfo(batchListInfo: GetBatchesResponse): String = {
+ val title = s"Total number of batches: ${batchListInfo.getTotal}"
+ val header =
+ Seq("Id", "Name", "User", "Type", "Instance", "State", "App Info", "Create Time", "End Time")
+ val rows = batchListInfo.getBatches.asScala.sortBy(_.getCreateTime).map { batch =>
+ Seq(
+ batch.getId,
+ batch.getName,
+ batch.getUser,
+ batch.getBatchType,
+ batch.getKyuubiInstance,
+ batch.getState,
+ batch.getBatchInfo.toString,
+ millisToDateString(batch.getCreateTime, "yyyy-MM-dd HH:mm:ss"),
+ millisToDateString(batch.getEndTime, "yyyy-MM-dd HH:mm:ss"))
+ }
+ Tabulator.format(title, header, rows, true)
+ }
+
+ def renderBatchInfo(batch: Batch): String = {
+ s"""Batch Info:
+ | Batch Id: ${batch.getId}
+ | Type: ${batch.getBatchType}
+ | Name: ${batch.getName}
+ | User: ${batch.getUser}
+ | State: ${batch.getState}
+ | Kyuubi Instance: ${batch.getKyuubiInstance}
+ | Create Time: ${millisToDateString(batch.getCreateTime, "yyyy-MM-dd HH:mm:ss")}
+ | End Time: ${millisToDateString(batch.getEndTime, "yyyy-MM-dd HH:mm:ss")}
+ | App Info: ${batch.getBatchInfo.toString}
+ """.stripMargin
+ }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
similarity index 83%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
index 98714f731..eaf68d3e5 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
@@ -15,32 +15,10 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ctl
-
-import java.io.PrintStream
+package org.apache.kyuubi.ctl.util
import org.apache.commons.lang3.StringUtils
-/**
- * Contains basic command line parsing functionality and methods to parse some common Kyuubi Ctl
- * options.
- */
-private[kyuubi] trait CommandLineUtils extends CommandLineLoggingUtils {
-
- def main(args: Array[String]): Unit
-}
-
-private[kyuubi] trait CommandLineLoggingUtils {
- // Exposed for testing
- private[kyuubi] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
-
- private[kyuubi] var printStream: PrintStream = System.err
-
- // scalastyle:off println
- private[kyuubi] def printMessage(msg: Any): Unit = printStream.println(msg)
- // scalastyle:on println
-}
-
/** Refer the showString method of org.apache.spark.sql.Dataset */
private[kyuubi] object Tabulator {
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
new file mode 100644
index 000000000..67fe497e1
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import java.net.InetAddress
+import java.nio.file.{Files, Paths}
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.ctl.CliConfig
+
+private[ctl] object Validator {
+
+ def validateZkArguments(cliConfig: CliConfig): Unit = {
+ if (cliConfig.commonOpts.zkQuorum == null) {
+ fail("Zookeeper quorum is not specified and no default value to load")
+ }
+ if (cliConfig.commonOpts.namespace == null) {
+ fail("Zookeeper namespace is not specified and no default value to load")
+ }
+ }
+
+ def validateHostAndPort(cliConfig: CliConfig): Unit = {
+ if (cliConfig.commonOpts.host == null) {
+ fail("Must specify host for service")
+ }
+ if (cliConfig.commonOpts.port == null) {
+ fail("Must specify port for service")
+ }
+
+ try {
+ InetAddress.getByName(cliConfig.commonOpts.host)
+ } catch {
+ case _: Exception =>
+ fail(s"Unknown host: ${cliConfig.commonOpts.host}")
+ }
+
+ try {
+ if (cliConfig.commonOpts.port.toInt <= 0) {
+ fail(s"Specified port should be a positive number")
+ }
+ } catch {
+ case _: NumberFormatException =>
+ fail(s"Specified port is not a valid integer number: " +
+ s"${cliConfig.commonOpts.port}")
+ }
+ }
+
+ def validateFilename(cliConfig: CliConfig): Unit = {
+ var filename = cliConfig.createOpts.filename
+ if (StringUtils.isBlank(filename)) {
+ fail(s"Config file is not specified.")
+ }
+
+ val currentPath = Paths.get("").toAbsolutePath
+ if (!filename.startsWith("/")) {
+ filename = currentPath + "/" + filename
+ }
+ if (!Files.exists(Paths.get(filename))) {
+ fail(s"Config file does not exist: ${filename}.")
+ }
+ }
+
+ private def fail(msg: String): Unit = throw new KyuubiException(msg)
+}
diff --git a/kyuubi-ctl/src/test/resources/cli/batch.yaml b/kyuubi-ctl/src/test/resources/cli/batch.yaml
new file mode 100644
index 000000000..ee3d6bdd0
--- /dev/null
+++ b/kyuubi-ctl/src/test/resources/cli/batch.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v1
+user: test_user
+request:
+ name: test_batch
+ resource: /MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar
+ className: org.apache.kyuubi.engine.spark.SparkSQLEngine
+ args:
+ - x1
+ - x2
+ configs:
+ hive.server2.proxy.user: b_user
+ wait.completion: true
+ k1: v1
+options:
+ verbose: true
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
new file mode 100644
index 000000000..d0d41a0a4
--- /dev/null
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
+
+class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
+
+ test("create/submit batch") {
+ Seq("create", "submit").foreach { op =>
+ val args = Seq(
+ op,
+ "batch",
+ "-f",
+ "src/test/resources/cli/batch.yaml")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.createOpts.filename == "src/test/resources/cli/batch.yaml")
+ }
+ }
+
+ test("create/submit batch and overwrite rest config") {
+ Seq("create", "submit").foreach { op =>
+ val args = Array(
+ op,
+ "batch",
+ "-f",
+ "src/test/resources/cli/batch.yaml",
+ "--hostUrl",
+ "https://localhost:8440",
+ "--username",
+ "test_user_1",
+ "--authSchema",
+ "spnego")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.commonOpts.hostUrl == "https://localhost:8440")
+ assert(opArgs.cliConfig.commonOpts.authSchema == "spnego")
+ assert(opArgs.cliConfig.commonOpts.username == "test_user_1")
+ }
+ }
+
+ test("create/submit batch without filename specified") {
+ Seq("create", "submit").foreach { op =>
+ val args = Array(
+ op,
+ "batch")
+ testPrematureExitForControlCliArgs(args, "Config file is not specified.")
+ }
+ }
+
+ test("create/submit batch with non-existed file") {
+ Seq("create", "submit").foreach { op =>
+ val args = Array(
+ op,
+ "batch",
+ "-f",
+ "fake.yaml")
+ testPrematureExitForControlCliArgs(args, "Config file does not exist")
+ }
+ }
+
+ test("get batch without batch id specified") {
+ val args = Array(
+ "get",
+ "batch")
+ testPrematureExitForControlCliArgs(args, "Must specify batchId for get batch command")
+ }
+
+ test("get batch") {
+ val args = Array(
+ "get",
+ "batch",
+ "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ }
+
+ test("test list batch option") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--batchUser",
+ "tom",
+ "--batchState",
+ "RUNNING",
+ "--createTime",
+ "20220607000000",
+ "--from",
+ "2",
+ "--size",
+ "5")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchType == "spark")
+ assert(opArgs.cliConfig.batchOpts.batchUser == "tom")
+ assert(opArgs.cliConfig.batchOpts.batchState == "RUNNING")
+ assert(opArgs.cliConfig.batchOpts.createTime ==
+ dateStringToMillis("20220607000000", "yyyyMMddHHmmss"))
+ assert(opArgs.cliConfig.batchOpts.endTime == 0)
+ assert(opArgs.cliConfig.batchOpts.from == 2)
+ assert(opArgs.cliConfig.batchOpts.size == 5)
+ }
+
+ test("test list batch default option") {
+ val args = Array(
+ "list",
+ "batch")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchType == null)
+ assert(opArgs.cliConfig.batchOpts.from == -1)
+ assert(opArgs.cliConfig.batchOpts.size == 100)
+ }
+
+ test("test bad list batch option - size") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "-4")
+ testPrematureExitForControlCliArgs(args, "Option --size must be >=0")
+ }
+
+ test("test bad list batch option - create date format") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "4",
+ "--createTime",
+ "20220101")
+ testPrematureExitForControlCliArgs(
+ args,
+ "Option --createTime must be in yyyyMMddHHmmss format.")
+ }
+
+ test("test bad list batch option - end date format") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "4",
+ "--endTime",
+ "20220101")
+ testPrematureExitForControlCliArgs(args, "Option --endTime must be in yyyyMMddHHmmss format.")
+ }
+
+ test("test bad list batch option - negative create date") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "4",
+ "--createTime",
+ "19690101000000")
+ testPrematureExitForControlCliArgs(
+ args,
+ "Invalid createTime, negative milliseconds are not supported.")
+ }
+
+ test("test bad list batch option - negative end date") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "4",
+ "--endTime",
+ "19690101000000")
+ testPrematureExitForControlCliArgs(
+ args,
+ "Invalid endTime, negative milliseconds are not supported.")
+ }
+
+ test("test bad list batch option - createTime > endTime") {
+ val args = Array(
+ "list",
+ "batch",
+ "--batchType",
+ "spark",
+ "--size",
+ "4",
+ "--createTime",
+ "20220602000000",
+ "--endTime",
+ "20220601000000")
+ testPrematureExitForControlCliArgs(
+ args,
+ "Invalid createTime/endTime, " +
+ "createTime should be less or equal to endTime.")
+ }
+
+ test("test log batch") {
+ val args = Array(
+ "log",
+ "batch",
+ "f7fd702c-e54e-11ec-8fea-0242ac120002",
+ "--from",
+ "2",
+ "--size",
+ "5")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ assert(opArgs.cliConfig.batchOpts.from == 2)
+ assert(opArgs.cliConfig.batchOpts.size == 5)
+ }
+
+ test("test log batch without batchId") {
+ val args = Array(
+ "log",
+ "batch",
+ "--from",
+ "2",
+ "--size",
+ "5")
+ testPrematureExitForControlCliArgs(args, "Must specify batchId for log batch command")
+ }
+
+ test("test log batch default option") {
+ val args = Array(
+ "log",
+ "batch",
+ "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ assert(opArgs.cliConfig.batchOpts.from == -1)
+ assert(opArgs.cliConfig.batchOpts.size == 100)
+ }
+
+}
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
index 9f424a742..392f0e3c1 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
@@ -20,33 +20,13 @@ package org.apache.kyuubi.ctl
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
-class ControlCliArgumentsSuite extends KyuubiFunSuite {
+class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
val zkQuorum = "localhost:2181"
val namespace = "kyuubi"
val user = "kyuubi"
val host = "localhost"
val port = "10000"
- /** Check whether the script exits and the given search string is printed. */
- private def testPrematureExit(args: Array[String], searchString: String): Unit = {
- val logAppender = new LogAppender("test premature exit")
- withLogAppender(logAppender) {
- val thread = new Thread {
- override def run(): Unit =
- try {
- new ControlCliArguments(args)
- } catch {
- case e: Exception =>
- error(e)
- }
- }
- thread.start()
- thread.join()
- assert(logAppender.loggingEvents.exists(
- _.getMessage.getFormattedMessage.contains(searchString)))
- }
- }
-
/** Check whether the script exits and the given search string is printed. */
private def testHelpExit(args: Array[String], searchString: String): Unit = {
val logAppender = new LogAppender("test premature exit")
@@ -91,14 +71,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--version",
KYUUBI_VERSION)
val opArgs = new ControlCliArguments(args)
- assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
- assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
- assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
- assert(opArgs.cliArgs.commonOpts.namespace == namespace)
- assert(opArgs.cliArgs.engineOpts.user == user)
- assert(opArgs.cliArgs.commonOpts.host == host)
- assert(opArgs.cliArgs.commonOpts.port == port)
- assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+ assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+ assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+ assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+ assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+ assert(opArgs.cliConfig.engineOpts.user == user)
+ assert(opArgs.cliConfig.commonOpts.host == host)
+ assert(opArgs.cliConfig.commonOpts.port == port)
+ assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
}
}
@@ -120,31 +100,37 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--version",
KYUUBI_VERSION)
val opArgs = new ControlCliArguments(args)
- assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
- assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
- assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
- assert(opArgs.cliArgs.commonOpts.namespace == newNamespace)
- assert(opArgs.cliArgs.commonOpts.host == host)
- assert(opArgs.cliArgs.commonOpts.port == port)
- assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+ assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+ assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+ assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+ assert(opArgs.cliConfig.commonOpts.namespace == newNamespace)
+ assert(opArgs.cliConfig.commonOpts.host == host)
+ assert(opArgs.cliConfig.commonOpts.port == port)
+ assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
}
}
test("prints usage on empty input") {
- testPrematureExit(Array.empty[String], "Must specify action command: [create|get|delete|list].")
- testPrematureExit(Array("--verbose"), "Must specify action command: [create|get|delete|list].")
+ testPrematureExitForControlCliArgs(
+ Array.empty[String],
+ "Must specify action command: [create|get|delete|list|log|submit].")
+ testPrematureExitForControlCliArgs(
+ Array("--verbose"),
+ "Must specify action command: [create|get|delete|list|log|submit].")
}
test("prints error with unrecognized options") {
- testPrematureExit(Array("create", "--unknown"), "Unknown option --unknown")
- testPrematureExit(Array("--unknown"), "Unknown option --unknown")
+ testPrematureExitForControlCliArgs(Array("create", "--unknown"), "Unknown option --unknown")
+ testPrematureExitForControlCliArgs(Array("--unknown"), "Unknown option --unknown")
}
test("test invalid arguments") {
// for server, user option is not support
- testPrematureExit(Array("create", "--user"), "Unknown option --user")
+ testPrematureExitForControlCliArgs(Array("create", "--user"), "Unknown option --user")
// for engine, user option need a value
- testPrematureExit(Array("get", "engine", "--user"), "Missing value after --user")
+ testPrematureExitForControlCliArgs(
+ Array("get", "engine", "--user"),
+ "Missing value after --user")
}
test("test extra unused arguments") {
@@ -152,13 +138,13 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"list",
"extraArg1",
"extraArg2")
- testPrematureExit(args, "Unknown argument 'extraArg1'")
+ testPrematureExitForControlCliArgs(args, "Unknown argument 'extraArg1'")
}
test("test list action arguments") {
val args = Array(
"list")
- testPrematureExit(args, "Zookeeper quorum is not specified")
+ testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
val args2 = Array(
"list",
@@ -167,14 +153,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--namespace",
namespace)
val opArgs = new ControlCliArguments(args2)
- assert(opArgs.cliArgs.action == ServiceControlAction.LIST)
+ assert(opArgs.cliConfig.action == ControlAction.LIST)
}
test("test get/delete action arguments") {
Seq("get", "delete").foreach { op =>
val args = Array(
op)
- testPrematureExit(args, "Zookeeper quorum is not specified")
+ testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
val args2 = Array(
op,
@@ -182,7 +168,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
zkQuorum,
"--namespace",
namespace)
- testPrematureExit(args2, "Must specify host")
+ testPrematureExitForControlCliArgs(args2, "Must specify host")
val args3 = Array(
op,
@@ -192,7 +178,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
namespace,
"--host",
host)
- testPrematureExit(args3, "Must specify port")
+ testPrematureExitForControlCliArgs(args3, "Must specify port")
val args4 = Array(
op,
@@ -205,7 +191,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
host,
"--port",
port)
- testPrematureExit(args4, "Must specify user name for engine")
+ testPrematureExitForControlCliArgs(args4, "Must specify user name for engine")
val args5 = Array(
op,
@@ -219,7 +205,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--port",
port)
val opArgs6 = new ControlCliArguments(args5)
- assert(opArgs6.cliArgs.action.toString.equalsIgnoreCase(op))
+ assert(opArgs6.cliConfig.action.toString.equalsIgnoreCase(op))
}
}
@@ -235,7 +221,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"unknown-host",
"--port",
port)
- testPrematureExit(args, "Unknown host")
+ testPrematureExitForControlCliArgs(args, "Unknown host")
}
test("test with invalid port specification") {
@@ -250,7 +236,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
host,
"--port",
"invalid-format")
- testPrematureExit(args, "Specified port is not a valid integer number")
+ testPrematureExitForControlCliArgs(args, "Specified port is not a valid integer number")
val args2 = Array(
"get",
@@ -263,7 +249,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
host,
"--port",
"0")
- testPrematureExit(args2, "Specified port should be a positive number")
+ testPrematureExitForControlCliArgs(args2, "Specified port should be a positive number")
}
test("test create action arguments") {
@@ -272,7 +258,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
val op = "create"
val args = Array(
op)
- testPrematureExit(args, "Zookeeper quorum is not specified")
+ testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
val args2 = Array(
op,
@@ -282,7 +268,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--namespace",
newNamespace)
val opArgs2 = new ControlCliArguments(args2)
- assert(opArgs2.cliArgs.action.toString.equalsIgnoreCase(op))
+ assert(opArgs2.cliConfig.action.toString.equalsIgnoreCase(op))
val args4 = Array(
op,
@@ -292,7 +278,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--namespace",
newNamespace)
// engine is not support, expect scopt print Unknown argument.
- testPrematureExit(args4, "Unknown argument 'engine'")
+ testPrematureExitForControlCliArgs(args4, "Unknown argument 'engine'")
}
}
@@ -302,8 +288,8 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"--zk-quorum",
zkQuorum)
val opArgs = new ControlCliArguments(args)
- assert(opArgs.cliArgs.commonOpts.namespace == namespace)
- assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+ assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+ assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
}
test("test use short options") {
@@ -325,14 +311,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
"-v",
KYUUBI_VERSION)
val opArgs = new ControlCliArguments(args)
- assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
- assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
- assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
- assert(opArgs.cliArgs.commonOpts.namespace == namespace)
- assert(opArgs.cliArgs.engineOpts.user == user)
- assert(opArgs.cliArgs.commonOpts.host == host)
- assert(opArgs.cliArgs.commonOpts.port == port)
- assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+ assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+ assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+ assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+ assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+ assert(opArgs.cliConfig.engineOpts.user == user)
+ assert(opArgs.cliConfig.commonOpts.host == host)
+ assert(opArgs.cliConfig.commonOpts.port == port)
+ assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
}
}
@@ -343,7 +329,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
zkQuorum,
"-b")
val opArgs3 = new ControlCliArguments(args2)
- assert(opArgs3.cliArgs.commonOpts.verbose)
+ assert(opArgs3.cliConfig.commonOpts.verbose)
}
test("test --help") {
@@ -353,7 +339,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
" change it if the active service is running in another."
val helpString =
s"""kyuubi $KYUUBI_VERSION
- |Usage: kyuubi-ctl [create|get|delete|list] [options]
+ |Usage: kyuubi-ctl [create|get|delete|list|log|submit] [options] <args>...
|
| -zk, --zk-quorum <value>
| $zkHelpString
@@ -362,14 +348,25 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
| -p, --port <value> Listening port of a service.
| -v, --version <value> $versionHelpString
| -b, --verbose Print additional debug output.
+ | --hostUrl <value> Host url for rest api.
+ | --authSchema <value> Auth schema for rest api, valid values are basic, spnego.
+ | --username <value> Username for basic authentication.
+ | --password <value> Password for basic authentication.
+ | --spnegoHost <value> Spnego host for spnego authentication.
|
- |Command: create [server]
- |
+ |Command: create [batch|server] [options]
+ |${"\t"}Create a resource.
+ | -f, --filename <value> Filename to use to create the resource
+ |Command: create batch
+ |${"\t"}Open batch session.
|Command: create server
|${"\t"}Expose Kyuubi server instance to another domain.
|
- |Command: get [server|engine] [options]
- |${"\t"}Get the service/engine node info, host and port needed.
+ |Command: get [batch|server|engine] [options] [<batchId>]
+ |${"\t"}Display information about the specified resources.
+ |Command: get batch
+ |${"\t"}Get batch by id.
+ | <batchId> Batch id.
|Command: get server
|${"\t"}Get Kyuubi server info of domain
|Command: get engine
@@ -382,8 +379,12 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
| -esl, --engine-share-level <value>
| The engine share level this engine belong to.
|
- |Command: delete [server|engine] [options]
- |${"\t"}Delete the specified service/engine node, host and port needed.
+ |Command: delete [batch|server|engine] [options] [<batchId>]
+ |${"\t"}Delete resources.
+ |Command: delete batch
+ |${"\t"}Close batch session.
+ | <batchId> Batch id.
+ | --hs2ProxyUser <value> The value of hive.server2.proxy.user config.
|Command: delete server
|${"\t"}Delete the specified service node for a domain
|Command: delete engine
@@ -396,8 +397,17 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
| -esl, --engine-share-level <value>
| The engine share level this engine belong to.
|
- |Command: list [server|engine] [options]
- |${"\t"}List all the service/engine nodes for a particular domain.
+ |Command: list [batch|server|engine] [options]
+ |${"\t"}List information about resources.
+ |Command: list batch
+ |${"\t"}List batch session info.
+ | --batchType <value> Batch type.
+ | --batchUser <value> Batch user.
+ | --batchState <value> Batch state.
+ | --createTime <value> Batch create time, should be in yyyyMMddHHmmss format.
+ | --endTime <value> Batch end time, should be in yyyyMMddHHmmss format.
+ | --from <value> Specify which record to start from retrieving info.
+ | --size <value> The max number of records returned in the query.
|Command: list server
|${"\t"}List all the service nodes for a particular domain
|Command: list engine
@@ -410,6 +420,21 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
| -esl, --engine-share-level <value>
| The engine share level this engine belong to.
|
+ |Command: log [batch] [options] [<batchId>]
+ |${"\t"}Print the logs for specified resource.
+ | --forward If forward is specified, the ctl will block forever.
+ |Command: log batch
+ |${"\t"}Get batch session local log.
+ | <batchId> Batch id.
+ | --from <value> Specify which record to start from retrieving info.
+ | --size <value> The max number of records returned in the query.
+ |
+ |Command: submit [batch] [options]
+ |${"\t"}Combination of create, get and log commands.
+ | -f, --filename <value> Filename to use to create the resource
+ |Command: submit batch
+ |${"\t"}open batch session and wait for completion.
+ |
| -h, --help Show help message and exit.""".stripMargin
testHelpExit(Array("--help"), helpString)
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
index f1a9984b9..cdd87b147 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
@@ -17,73 +17,15 @@
package org.apache.kyuubi.ctl
-import java.io.{OutputStream, PrintStream}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-trait TestPrematureExit {
- suite: KyuubiFunSuite =>
-
- private val noOpOutputStream = new OutputStream {
- def write(b: Int) = {}
- }
-
- /** Simple PrintStream that reads data into a buffer */
- private class BufferPrintStream extends PrintStream(noOpOutputStream) {
- var lineBuffer = ArrayBuffer[String]()
- // scalastyle:off println
- override def println(line: Any): Unit = {
- lineBuffer += line.toString
- }
- // scalastyle:on println
- }
-
- /** Returns true if the script exits and the given search string is printed. */
- private[kyuubi] def testPrematureExit(
- input: Array[String],
- searchString: String,
- mainObject: CommandLineUtils = ControlCli): Unit = {
- val printStream = new BufferPrintStream()
- mainObject.printStream = printStream
-
- @volatile var exitedCleanly = false
- val original = mainObject.exitFn
- mainObject.exitFn = (_) => exitedCleanly = true
- try {
- @volatile var exception: Exception = null
- val thread = new Thread {
- override def run() =
- try {
- mainObject.main(input)
- } catch {
- // Capture the exception to check whether the exception contains searchString or not
- case e: Exception => exception = e
- }
- }
- thread.start()
- thread.join()
- if (exitedCleanly) {
- val joined = printStream.lineBuffer.mkString("\n")
- assert(joined.contains(searchString))
- } else {
- assert(exception != null)
- if (!exception.getMessage.contains(searchString)) {
- throw exception
- }
- }
- } finally {
- mainObject.exitFn = original
- }
- }
-}
-
class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
import DiscoveryClientProvider._
@@ -149,7 +91,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
host,
"--port",
port)
- testPrematureExit(args, "Only support expose Kyuubi server instance to another domain")
+ testPrematureExitForControlCli(
+ args,
+ "Only support expose Kyuubi server instance to another domain")
}
test("test not specified namespace") {
@@ -168,7 +112,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
host,
"--port",
port)
- testPrematureExit(args2, "Only support expose Kyuubi server instance to another domain")
+ testPrematureExitForControlCli(
+ args2,
+ "Only support expose Kyuubi server instance to another domain")
}
test("test expose to another namespace") {
@@ -190,7 +136,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
host,
"--port",
port)
- testPrematureExit(args, "")
+ testPrematureExitForControlCli(args, "")
}
test("test render zookeeper service node info") {
@@ -240,7 +186,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
- testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
+ testPrematureExitForControlCli(
+ args,
+ getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
val znodeRoot = s"/$newNamespace"
val children = framework.getChildren(znodeRoot).sorted
assert(children.size == 2)
@@ -264,7 +212,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--namespace",
namespace)
val scArgs1 = new ControlCliArguments(arg1)
- assert(scArgs1.command.getZkNamespace() == s"/$namespace")
+ assert(CtlUtils.getZkNamespace(
+ scArgs1.command.conf,
+ scArgs1.command.normalizedCliConfig) == s"/$namespace")
val arg2 = Array(
"list",
@@ -276,7 +226,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--user",
user)
val scArgs2 = new ControlCliArguments(arg2)
- assert(scArgs2.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
}
@@ -305,7 +255,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
- testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
+ testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
}
}
@@ -337,7 +287,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
val expectedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None))
- testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
+ testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
}
}
@@ -371,7 +321,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
val expectedDeletedNodes = Seq(
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None))
- testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
+ testPrematureExitForControlCli(
+ args,
+ getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
}
}
@@ -401,7 +353,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
- testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))
+ testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))
}
}
@@ -416,7 +368,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--user",
user)
val scArgs1 = new ControlCliArguments(arg1)
- assert(scArgs1.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
val arg2 = Array(
@@ -431,7 +383,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-type",
"FLINK_SQL")
val scArgs2 = new ControlCliArguments(arg2)
- assert(scArgs2.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_FLINK_SQL/$user/default")
val arg3 = Array(
@@ -446,7 +398,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-type",
"TRINO")
val scArgs3 = new ControlCliArguments(arg3)
- assert(scArgs3.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_TRINO/$user/default")
val arg4 = Array(
@@ -463,7 +415,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-subdomain",
"sub_1")
val scArgs4 = new ControlCliArguments(arg4)
- assert(scArgs4.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/sub_1")
val arg5 = Array(
@@ -482,7 +434,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-subdomain",
"sub_1")
val scArgs5 = new ControlCliArguments(arg5)
- assert(scArgs5.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
s"/${namespace}_1.5.0_USER_SPARK_SQL/$user/sub_1")
}
@@ -497,7 +449,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--user",
user)
val scArgs1 = new ControlCliArguments(arg1)
- assert(scArgs1.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
val arg2 = Array(
@@ -512,7 +464,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"CONNECTION")
val scArgs2 = new ControlCliArguments(arg2)
- assert(scArgs2.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL/$user/default")
val arg3 = Array(
@@ -527,7 +479,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"USER")
val scArgs3 = new ControlCliArguments(arg3)
- assert(scArgs3.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
val arg4 = Array(
@@ -542,7 +494,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"GROUP")
val scArgs4 = new ControlCliArguments(arg4)
- assert(scArgs4.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_GROUP_SPARK_SQL/$user/default")
val arg5 = Array(
@@ -557,7 +509,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
"--engine-share-level",
"SERVER")
val scArgs5 = new ControlCliArguments(arg5)
- assert(scArgs5.command.getZkNamespace() ==
+ assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
s"/${namespace}_${KYUUBI_VERSION}_SERVER_SPARK_SQL/$user/default")
}
}
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala
new file mode 100644
index 000000000..6477b9315
--- /dev/null
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ctl
+
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ctl.util.CommandLineUtils
+
+trait TestPrematureExit {
+ suite: KyuubiFunSuite =>
+
+ private val noOpOutputStream = new OutputStream {
+ def write(b: Int) = {}
+ }
+
+ /** Simple PrintStream that reads data into a buffer */
+ private class BufferPrintStream extends PrintStream(noOpOutputStream) {
+ var lineBuffer = ArrayBuffer[String]()
+ // scalastyle:off println
+ override def println(line: Any): Unit = {
+ lineBuffer += line.toString
+ }
+ // scalastyle:on println
+ }
+
+ /** Returns true if the script exits and the given search string is printed. */
+ private[kyuubi] def testPrematureExitForControlCli(
+ input: Array[String],
+ searchString: String,
+ mainObject: CommandLineUtils = ControlCli): String = {
+ val printStream = new BufferPrintStream()
+ mainObject.printStream = printStream
+
+ @volatile var exitedCleanly = false
+ val original = mainObject.exitFn
+ mainObject.exitFn = (_) => exitedCleanly = true
+ try {
+ @volatile var exception: Exception = null
+ val thread = new Thread {
+ override def run() =
+ try {
+ mainObject.main(input)
+ } catch {
+ // Capture the exception to check whether the exception contains searchString or not
+ case e: Exception => exception = e
+ }
+ }
+ thread.start()
+ thread.join()
+ var joined = ""
+ if (exitedCleanly) {
+ joined = printStream.lineBuffer.mkString("\n")
+ assert(joined.contains(searchString))
+ } else {
+ assert(exception != null)
+ if (!exception.getMessage.contains(searchString)) {
+ throw exception
+ }
+ }
+ joined
+ } finally {
+ mainObject.exitFn = original
+ }
+ }
+
+ def testPrematureExitForControlCliArgs(args: Array[String], searchString: String): Unit = {
+ val logAppender = new LogAppender("test premature exit")
+ withLogAppender(logAppender) {
+ val thread = new Thread {
+ override def run(): Unit =
+ try {
+ new ControlCliArguments(args)
+ } catch {
+ case e: Exception =>
+ error(e)
+ }
+ }
+ thread.start()
+ thread.join()
+ assert(logAppender.loggingEvents.exists(
+ _.getMessage.getFormattedMessage.contains(searchString)))
+ }
+ }
+}
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
index 121579607..a9c064477 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
@@ -60,10 +60,10 @@ public class BatchRestApi {
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
- if (null != createTime && createTime >= 0) {
+ if (null != createTime && createTime > 0) {
params.put("createTime", createTime);
}
- if (null != endTime && endTime >= 0) {
+ if (null != endTime && endTime > 0) {
params.put("endTime", endTime);
}
params.put("from", from);
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 4e2224e3b..d7ff2d0f7 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -226,6 +226,21 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-ctl_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-ctl_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-download</artifactId>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
index 616b5c669..8317837cb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.{KyuubiFunSuite, TestUtils, Utils}
+import org.apache.kyuubi.ctl.CtlConf
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.metrics.MetricsConf
import org.apache.kyuubi.server.statestore.jdbc.JDBCStateStoreConf
@@ -69,8 +70,9 @@ class AllKyuubiConfiguration extends KyuubiFunSuite {
test("Check all kyuubi configs") {
KyuubiConf
- JDBCStateStoreConf
+ CtlConf
HighAvailabilityConf
+ JDBCStateStoreConf
MetricsConf
ZookeeperConf
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
new file mode 100644
index 000000000..de662e5e8
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.rest.client
+
+import java.net.InetAddress
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{RestClientTestHelper, Utils}
+import org.apache.kyuubi.client.api.v1.dto.BatchRequest
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SPARK_MAX_LIFETIME}
+import org.apache.kyuubi.ctl.TestPrematureExit
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.session.KyuubiSessionManager
+
+class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
+
+ val basePath: String = Utils.getCodeSourceLocation(getClass)
+ val batchFile: String = s"${basePath}/batch.yaml"
+ val appName: String = "test_batch"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ System.setProperty("kyuubi.ctl.rest.base.url", baseUri.toString)
+ System.setProperty("kyuubi.ctl.rest.spnego.host", "localhost")
+
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+ val batch_basic = s"""apiVersion: v1
+ |batchType: Spark
+ |username: ${ldapUser}
+ |request:
+ | name: ${appName}
+ | resource: ${sparkProcessBuilder.mainResource.get}
+ | className: ${sparkProcessBuilder.mainClass}
+ | args:
+ | - x1
+ | - x2
+ | configs:
+ | spark.master: local
+ | spark.${ENGINE_SPARK_MAX_LIFETIME.key}: "5000"
+ | spark.${ENGINE_CHECK_INTERVAL.key}: "1000"
+ | k1: v1
+ |options:
+ | verbose: true""".stripMargin
+ Files.write(Paths.get(batchFile), batch_basic.getBytes(StandardCharsets.UTF_8))
+ }
+
+ override def afterEach(): Unit = {
+ val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ sessionManager.allSessions().foreach { session =>
+ sessionManager.closeSession(session.handle)
+ }
+ sessionManager.getBatchesFromStateStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
+ batch =>
+ sessionManager.applicationManager.killApplication(None, batch.getId)
+ sessionManager.cleanupMetadata(batch.getId)
+ }
+ }
+
+ test("basic batch rest client") {
+ val createArgs = Array(
+ "create",
+ "batch",
+ "-f",
+ batchFile,
+ "--password",
+ ldapUserPasswd)
+ var result = testPrematureExitForControlCli(createArgs, "")
+ assert(result.contains("Type: SPARK"))
+ assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+ val startIndex = result.indexOf("Id: ") + 4
+ val endIndex = result.indexOf("\n", startIndex)
+ val batchId = result.substring(startIndex, endIndex)
+
+ val getArgs = Array(
+ "get",
+ "batch",
+ batchId,
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd)
+ result = testPrematureExitForControlCli(getArgs, "Type: SPARK")
+ assert(result.contains("Type: SPARK"))
+ assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+
+ val logArgs = Array(
+ "log",
+ "batch",
+ batchId,
+ "--size",
+ "2",
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd)
+ result = testPrematureExitForControlCli(logArgs, "")
+ val rows = result.split("\n")
+ assert(rows.length === 2)
+
+ val deleteArgs = Array(
+ "delete",
+ "batch",
+ batchId,
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd)
+ result = testPrematureExitForControlCli(deleteArgs, "\"success\":true")
+ }
+
+ test("spnego batch rest client") {
+ UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+ val createArgs = Array(
+ "create",
+ "batch",
+ "-f",
+ batchFile,
+ "--authSchema",
+ "SPNEGO")
+ var result = testPrematureExitForControlCli(createArgs, "")
+ assert(result.contains("Type: SPARK"))
+ assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+ val startIndex = result.indexOf("Id: ") + 4
+ val endIndex = result.indexOf("\n", startIndex)
+ val batchId = result.substring(startIndex, endIndex)
+
+ val getArgs = Array(
+ "get",
+ "batch",
+ batchId,
+ "--authSchema",
+ "spnego")
+ result = testPrematureExitForControlCli(getArgs, "Type: SPARK")
+ assert(result.contains("Type: SPARK"))
+ assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+
+ val logArgs = Array(
+ "log",
+ "batch",
+ batchId,
+ "--size",
+ "2",
+ "--authSchema",
+ "spnego")
+ result = testPrematureExitForControlCli(logArgs, "")
+ val rows = result.split("\n")
+ assert(rows.length === 2)
+
+ val deleteArgs = Array(
+ "delete",
+ "batch",
+ batchId,
+ "--authSchema",
+ "spnego")
+ result = testPrematureExitForControlCli(deleteArgs, "\"success\":true")
+ }
+
+ test("log batch test") {
+ val createArgs = Array(
+ "create",
+ "batch",
+ "-f",
+ batchFile,
+ "--password",
+ ldapUserPasswd)
+ val result = testPrematureExitForControlCli(createArgs, "")
+ assert(result.contains("Type: SPARK"))
+ assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+ val startIndex = result.indexOf("Id: ") + 4
+ val endIndex = result.indexOf("\n", startIndex)
+ val batchId = result.substring(startIndex, endIndex)
+
+ val logArgs = Array(
+ "log",
+ "batch",
+ batchId,
+ "--size",
+ "100",
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd,
+ "--forward")
+ testPrematureExitForControlCli(logArgs, s"Submitted application: ${appName}")
+ }
+
+ test("submit batch test") {
+ val submitArgs = Array(
+ "submit",
+ "batch",
+ "-f",
+ batchFile,
+ "--password",
+ ldapUserPasswd)
+ testPrematureExitForControlCli(submitArgs, s"Submitted application: ${appName}")
+ }
+
+ test("list batch test") {
+ val sessionManager = server.frontendServices.head
+ .be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ sessionManager.allSessions().foreach(_.close())
+
+ sessionManager.openBatchSession(
+ "kyuubi",
+ "kyuubi",
+ InetAddress.getLocalHost.getCanonicalHostName,
+ Map.empty,
+ new BatchRequest(
+ "spark",
+ "",
+ "",
+ ""))
+ sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
+ "",
+ "",
+ "",
+ Map.empty)
+ sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
+ "",
+ "",
+ "",
+ Map.empty)
+ sessionManager.openBatchSession(
+ "kyuubi",
+ "kyuubi",
+ InetAddress.getLocalHost.getCanonicalHostName,
+ Map.empty,
+ new BatchRequest(
+ "spark",
+ "",
+ "",
+ ""))
+ sessionManager.openBatchSession(
+ "kyuubi",
+ "kyuubi",
+ InetAddress.getLocalHost.getCanonicalHostName,
+ Map.empty,
+ new BatchRequest(
+ "spark",
+ "",
+ "",
+ ""))
+
+ val listArgs = Array(
+ "list",
+ "batch",
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd,
+ "--batchType",
+ "spark",
+ "--batchUser",
+ "kyuubi",
+ "--createTime",
+ "20220101000000")
+ testPrematureExitForControlCli(listArgs, "Total number of batches: 3")
+
+ val listArgs1 = Array(
+ "list",
+ "batch",
+ "--username",
+ ldapUser,
+ "--password",
+ ldapUserPasswd,
+ "--endTime",
+ "20220101000000")
+ testPrematureExitForControlCli(listArgs1, "Total number of batches: 0")
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index a168191b2..4deaefdd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,7 @@
<scalatestplus.version>3.2.9.0</scalatestplus.version>
<scopt.version>4.0.1</scopt.version>
<slf4j.version>1.7.35</slf4j.version>
+ <snakeyaml.version>1.30</snakeyaml.version>
<!--
DO NOT forget to change the following properties when change the minor version of Spark:
`delta.version`, `iceberg.name`, `maven.plugin.scalatest.exclude.tags`