You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/20 03:56:42 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c664c84f8 [KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch
c664c84f8 is described below
commit c664c84f80ef04de2956f36ae79d52d303613fed
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Mon Jun 20 11:56:36 2022 +0800
[KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch
### _Why are the changes needed?_
Support waitCompletion for submit batch
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2909 from lightning-L/kyuubi-2628-1.
Closes #2628
23fe761f [Tianlin Liao] minor fix
06cd66ff [Tianlin Liao] support waitUntilComplete for submit batch; use kyuubi instance url for log batch/submit batch
Authored-by: Tianlin Liao <ti...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
docs/deployment/settings.md | 1 +
.../scala/org/apache/kyuubi/ctl/CliConfig.scala | 3 +-
.../scala/org/apache/kyuubi/ctl/CommandLine.scala | 17 +++-
.../main/scala/org/apache/kyuubi/ctl/CtlConf.scala | 7 ++
.../org/apache/kyuubi/ctl/RestClientFactory.scala | 12 +++
.../scala/org/apache/kyuubi/ctl/cmd/Command.scala | 28 -------
.../kyuubi/ctl/cmd/create/CreateBatchCommand.scala | 11 ++-
.../kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala | 6 +-
.../kyuubi/ctl/cmd/log/LogBatchCommand.scala | 91 +++++++++++++++-------
.../kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala | 9 ++-
.../org/apache/kyuubi/ctl/util/Validator.scala | 6 +-
.../apache/kyuubi/ctl/BatchCliArgumentsSuite.scala | 42 ++++++++--
.../kyuubi/ctl/ControlCliArgumentsSuite.scala | 5 ++
.../org/apache/kyuubi/client/KyuubiRestClient.java | 54 ++++++++++---
.../kyuubi/server/rest/client/BatchCliSuite.scala | 37 +++++++--
15 files changed, 233 insertions(+), 96 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 36980ceb5..8e2645cdf 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -187,6 +187,7 @@ kyuubi.credentials.update.wait.timeout|PT1M|How long to wait until credentials a
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
+kyuubi.ctl.batch.log.query.interval|PT3S|The interval for fetching batch logs.|duration|1.6.0
kyuubi.ctl.rest.auth.schema|basic|The authentication schema. Valid values are: basic, spnego.|string|1.6.0
kyuubi.ctl.rest.base.url|<undefined>|The REST API base URL, which contains the scheme (http:// or https://), host name, port number|string|1.6.0
kyuubi.ctl.rest.request.attempt.wait|PT3S|How long to wait between attempts of ctl rest request.|duration|1.6.0
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
index 2b559d012..d5c8d49e5 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
@@ -65,7 +65,8 @@ case class BatchOpts(
endTime: Long = 0,
from: Int = -1,
size: Int = 100,
- hs2ProxyUser: String = null)
+ hs2ProxyUser: String = null,
+ waitCompletion: Boolean = true)
case class EngineOpts(
user: String = null,
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
index b1bef0224..2dc8be824 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
@@ -105,7 +105,7 @@ object CommandLine {
opt[String]('f', "filename")
.action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
.text("Filename to use to create the resource"),
- batchCmd(builder).text("\tOpen batch session."),
+ createBatchCmd(builder).text("\tOpen batch session."),
serverCmd(builder).text("\tExpose Kyuubi server instance to another domain.")))
}
@@ -176,7 +176,7 @@ object CommandLine {
opt[String]('f', "filename")
.action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
.text("Filename to use to create the resource"),
- batchCmd(builder).text("\topen batch session and wait for completion.")))
+ submitBatchCmd(builder).text("\topen batch session and wait for completion.")))
}
private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
@@ -202,7 +202,7 @@ object CommandLine {
.text("The engine share level this engine belong to."))
}
- private def batchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ private def createBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
}
@@ -307,4 +307,15 @@ object CommandLine {
.text("The max number of records returned in the query."))
}
+ private def submitBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+ import builder._
+ cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+ .children(
+ opt[Boolean]("waitCompletion")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(waitCompletion = v)))
+ .text("Boolean property. If true(default), the client process will stay alive " +
+ "until the batch is in any terminal state. If false, the client will exit " +
+ "when the batch is no longer in PENDING state."))
+ }
+
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
index 9a57120a9..e09a89118 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
@@ -60,4 +60,11 @@ object CtlConf {
.version("1.6.0")
.timeConf
.createWithDefault(Duration.ofSeconds(3).toMillis)
+
+ val CTL_BATCH_LOG_QUERY_INTERVAL =
+ buildConf("kyuubi.ctl.batch.log.query.interval")
+ .doc("The interval for fetching batch logs.")
+ .version("1.6.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(3).toMillis)
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
index f3fdf6223..a3acbbe0a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
@@ -40,6 +40,18 @@ object RestClientFactory {
}
}
+ private[ctl] def withKyuubiInstanceRestClient(
+ kyuubiRestClient: KyuubiRestClient,
+ kyuubiInstance: String)(f: KyuubiRestClient => Unit): Unit = {
+ val kyuubiInstanceRestClient = kyuubiRestClient.clone()
+ kyuubiInstanceRestClient.setHostUrls(s"http://${kyuubiInstance}")
+ try {
+ f(kyuubiInstanceRestClient)
+ } finally {
+ kyuubiInstanceRestClient.close()
+ }
+ }
+
private def getKyuubiRestClient(
cliConfig: CliConfig,
map: JMap[String, Object],
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
index 168b789a7..7d0fc19a7 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
@@ -16,12 +16,6 @@
*/
package org.apache.kyuubi.ctl.cmd
-import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-
-import org.yaml.snakeyaml.Yaml
-
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.CliConfig
@@ -30,8 +24,6 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
abstract class Command[T](cliConfig: CliConfig) extends Logging {
- protected val DEFAULT_LOG_QUERY_INTERVAL: Int = 1000
-
val conf = KyuubiConf().loadFileDefaults()
cliConfig.conf.foreach { case (key, value) =>
@@ -91,26 +83,6 @@ abstract class Command[T](cliConfig: CliConfig) extends Logging {
arguments
}
- private[ctl] def readConfig(): HashMap[String, Object] = {
- var filename = normalizedCliConfig.createOpts.filename
-
- var map: HashMap[String, Object] = null
- var br: BufferedReader = null
- try {
- val yaml = new Yaml()
- val input = new FileInputStream(new File(filename))
- br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
- map = yaml.load(br).asInstanceOf[HashMap[String, Object]]
- } catch {
- case e: Exception => fail(s"Failed to read yaml file[$filename]: $e")
- } finally {
- if (br != null) {
- br.close()
- }
- }
- map
- }
-
override def info(msg: => Any): Unit = printMessage(msg)
override def warn(msg: => Any): Unit = printMessage(s"Warning: $msg")
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
index 01c48b50e..90a375355 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
@@ -16,7 +16,10 @@
*/
package org.apache.kyuubi.ctl.cmd.create
-import java.util.{ArrayList, HashMap}
+import java.util.{Map => JMap}
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
import org.apache.kyuubi.client.BatchRestApi
import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
@@ -37,13 +40,15 @@ class CreateBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
- val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+ val request = map.get("request").asInstanceOf[JMap[String, Object]]
+ val config = request.get("configs").asInstanceOf[JMap[Object, Object]].asScala
+ .map { case (k, v) => (k.toString, v.toString) }.asJava
val batchRequest = new BatchRequest(
map.get("batchType").asInstanceOf[String],
request.get("resource").asInstanceOf[String],
request.get("className").asInstanceOf[String],
request.get("name").asInstanceOf[String],
- request.get("configs").asInstanceOf[HashMap[String, String]],
+ config,
request.get("args").asInstanceOf[ArrayList[String]])
batchRestApi.createBatch(batchRequest)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
index 6f664d334..24f014d7e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
@@ -25,7 +25,11 @@ import org.apache.kyuubi.ctl.cmd.Command
import org.apache.kyuubi.ctl.util.BatchUtil
class DeleteBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig) {
- def validate(): Unit = {}
+ def validate(): Unit = {
+ if (normalizedCliConfig.batchOpts.batchId == null) {
+ fail("Must specify batchId for delete batch command.")
+ }
+ }
def doRun(): Batch = {
withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
index 0a23e028a..daea4f399 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
@@ -22,13 +22,16 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.client.BatchRestApi
import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
-import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.ctl.CliConfig
-import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.CtlConf._
+import org.apache.kyuubi.ctl.RestClientFactory.{withKyuubiInstanceRestClient, withKyuubiRestClient}
import org.apache.kyuubi.ctl.cmd.Command
import org.apache.kyuubi.ctl.util.{BatchUtil, Render}
-class LogBatchCommand(cliConfig: CliConfig, restConfigMap: JMap[String, Object] = null)
+class LogBatchCommand(
+ cliConfig: CliConfig,
+ batch: Option[Batch] = None,
+ restConfigMap: JMap[String, Object] = null)
extends Command[Batch](cliConfig) {
def validate(): Unit = {
@@ -46,41 +49,73 @@ class LogBatchCommand(cliConfig: CliConfig, restConfigMap: JMap[String, Object]
var log: OperationLog = null
var done = false
- var batch: Batch = null
+ var batch = this.batch.getOrElse(batchRestApi.getBatchById(batchId))
+ val kyuubiInstance = batch.getKyuubiInstance
- while (!done) {
- try {
- log = batchRestApi.getBatchLocalLog(
- batchId,
- from,
- size)
- from += log.getLogRowSet.size
- log.getLogRowSet.asScala.foreach(x => info(x))
- if (!normalizedCliConfig.logOpts.forward) {
- done = true
- }
- } catch {
- case e: KyuubiRestException =>
- error(s"Error fetching batch logs: ${e.getMessage}")
- }
+ withKyuubiInstanceRestClient(kyuubiRestClient, kyuubiInstance) { kyuubiInstanceRestClient =>
+ val kyuubiInstanceBatchRestApi: BatchRestApi = new BatchRestApi(kyuubiInstanceRestClient)
+ while (!done) {
+ try {
+ log = kyuubiInstanceBatchRestApi.getBatchLocalLog(
+ batchId,
+ from,
+ size)
+ from += log.getLogRowSet.size
+ log.getLogRowSet.asScala.foreach(x => info(x))
- if (log == null || log.getLogRowSet.size() == 0) {
- batch = batchRestApi.getBatchById(batchId)
- if (BatchUtil.isTerminalState(batch.getState)) {
- done = true
+ val (latestBatch, shouldFinishLog) =
+ checkStatus(kyuubiInstanceBatchRestApi, batchId, log)
+ batch = latestBatch
+ done = shouldFinishLog
+ } catch {
+ case e: Exception =>
+ val (latestBatch, shouldFinishLog) = checkStatus(batchRestApi, batchId, log)
+ batch = latestBatch
+ done = shouldFinishLog
+ if (done) {
+ error(s"Error fetching batch logs: ${e.getMessage}")
+ }
}
- }
- if (!done) {
- Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+ if (!done) {
+ Thread.sleep(conf.get(CTL_BATCH_LOG_QUERY_INTERVAL).toInt)
+ }
}
}
-
batch
}
}
def render(batch: Batch): Unit = {
- info(Render.renderBatchInfo(batch))
+ if (normalizedCliConfig.logOpts.forward) {
+ info(Render.renderBatchInfo(batch))
+ }
+ }
+
+ private def checkStatus(
+ batchRestApi: BatchRestApi,
+ batchId: String,
+ log: OperationLog): (Batch, Boolean) = {
+ var batch: Batch = null
+
+ if (!normalizedCliConfig.logOpts.forward) {
+ return (batch, true)
+ }
+
+ if (normalizedCliConfig.batchOpts.waitCompletion) {
+ if (log == null || log.getLogRowSet.size == 0) {
+ batch = batchRestApi.getBatchById(batchId)
+ if (BatchUtil.isTerminalState(batch.getState)) {
+ return (batch, true)
+ }
+ }
+ } else {
+ batch = batchRestApi.getBatchById(batchId)
+ if (!BatchUtil.isPendingState(batch.getState)) {
+ return (batch, true)
+ }
+ }
+
+ (batch, false)
}
}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
index 119553fe9..b81dac646 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
@@ -34,14 +34,17 @@ class SubmitBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
val createBatchCommand = new CreateBatchCommand(normalizedCliConfig)
- val batchId = createBatchCommand.doRun().getId
+ var batch = createBatchCommand.doRun()
val logBatchCommand = new LogBatchCommand(
normalizedCliConfig.copy(
- batchOpts = BatchOpts(batchId = batchId),
+ batchOpts = BatchOpts(
+ batchId = batch.getId,
+ waitCompletion = normalizedCliConfig.batchOpts.waitCompletion),
logOpts = LogOpts(forward = true)),
+ Some(batch),
map)
- val batch = logBatchCommand.doRun()
+ batch = logBatchCommand.doRun()
if (BatchUtil.isTerminalState(batch.getState) && !BatchUtil.isFinishedState(batch.getState)) {
error(s"Batch ${batch.getId} failed: ${JsonUtil.toJson(batch)}")
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
index 67fe497e1..738cdc2e3 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
@@ -62,15 +62,11 @@ private[ctl] object Validator {
}
def validateFilename(cliConfig: CliConfig): Unit = {
- var filename = cliConfig.createOpts.filename
+ val filename = cliConfig.createOpts.filename
if (StringUtils.isBlank(filename)) {
fail(s"Config file is not specified.")
}
- val currentPath = Paths.get("").toAbsolutePath
- if (!filename.startsWith("/")) {
- filename = currentPath + "/" + filename
- }
if (!Files.exists(Paths.get(filename))) {
fail(s"Config file does not exist: ${filename}.")
}
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
index d0d41a0a4..a958a1b6e 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
@@ -73,20 +73,46 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
}
}
- test("get batch without batch id specified") {
+ test("submit batch default option") {
val args = Array(
- "get",
- "batch")
- testPrematureExitForControlCliArgs(args, "Must specify batchId for get batch command")
+ "submit",
+ "batch",
+ "-f",
+ "src/test/resources/cli/batch.yaml")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.waitCompletion == true)
}
- test("get batch") {
+ test("submit batch without waitForCompletion") {
val args = Array(
- "get",
+ "submit",
"batch",
- "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ "-f",
+ "src/test/resources/cli/batch.yaml",
+ "--waitCompletion",
+ "false")
val opArgs = new ControlCliArguments(args)
- assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ assert(opArgs.cliConfig.batchOpts.waitCompletion == false)
+ }
+
+ test("get/delete batch") {
+ Seq("get", "delete").foreach { op =>
+ val args = Seq(
+ op,
+ "batch",
+ "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ val opArgs = new ControlCliArguments(args)
+ assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+ }
+ }
+
+ test("get/delete batch without batch id specified") {
+ Seq("get", "delete").foreach { op =>
+ val args = Array(
+ op,
+ "batch")
+ testPrematureExitForControlCliArgs(args, s"Must specify batchId for ${op} batch command")
+ }
}
test("test list batch option") {
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
index 11804d999..17aa31d12 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
@@ -337,6 +337,9 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
val zkHelpString = "The connection string for the zookeeper ensemble, using zk quorum manually."
val versionHelpString = "Using the compiled KYUUBI_VERSION default," +
" change it if the active service is running in another."
+ val waitBatchCompletionHelpString = "Boolean property. If true(default), the client process " +
+ "will stay alive until the batch is in any terminal state. If false, the client will exit " +
+ "when the batch is no longer in PENDING state."
val helpString =
s"""kyuubi $KYUUBI_VERSION
|Usage: kyuubi-ctl [create|get|delete|list|log|submit] [options] <args>...
@@ -435,6 +438,8 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
| -f, --filename <value> Filename to use to create the resource
|Command: submit batch
|${"\t"}open batch session and wait for completion.
+ | --waitCompletion <value>
+ | ${waitBatchCompletionHelpString}
|
| -h, --help Show help message and exit.""".stripMargin
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
index 309e3dbec..93857297f 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
@@ -24,10 +24,16 @@ import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.client.auth.*;
-public class KyuubiRestClient implements AutoCloseable {
+public class KyuubiRestClient implements AutoCloseable, Cloneable {
private IRestClient httpClient;
+ private RestClientConf conf;
+
+ private List<String> baseUrls;
+
+ private ApiVersion version;
+
private AuthHeaderGenerator authHeaderGenerator;
/** Specifies the version of the Kyuubi API to communicate with. */
@@ -52,24 +58,43 @@ public class KyuubiRestClient implements AutoCloseable {
}
}
+ @Override
+ public KyuubiRestClient clone() {
+ KyuubiRestClient kyuubiRestClient = new KyuubiRestClient();
+ kyuubiRestClient.version = this.version;
+ kyuubiRestClient.conf = this.conf;
+ kyuubiRestClient.baseUrls = this.baseUrls;
+ kyuubiRestClient.httpClient = RetryableRestClient.getRestClient(this.baseUrls, this.conf);
+ kyuubiRestClient.authHeaderGenerator = this.authHeaderGenerator;
+ return kyuubiRestClient;
+ }
+
+ public void setHostUrls(String... hostUrls) {
+ setHostUrls(Arrays.asList(hostUrls));
+ }
+
+ public void setHostUrls(List<String> hostUrls) {
+ if (hostUrls.isEmpty()) {
+ throw new IllegalArgumentException("hostUrls cannot be blank.");
+ }
+ List<String> baseUrls = initBaseUrls(hostUrls, version);
+ this.httpClient = RetryableRestClient.getRestClient(baseUrls, this.conf);
+ }
+
private KyuubiRestClient() {}
private KyuubiRestClient(Builder builder) {
- List<String> baseUrls = new LinkedList<>();
- for (String hostUrl : builder.hostUrls) {
- // Remove the trailing "/" from the hostUrl if present
- String baseUrl =
- String.format("%s/%s", hostUrl.replaceAll("/$", ""), builder.version.getApiNamespace());
- baseUrls.add(baseUrl);
- }
+ this.version = builder.version;
+ this.baseUrls = initBaseUrls(builder.hostUrls, builder.version);
RestClientConf conf = new RestClientConf();
conf.setConnectTimeout(builder.connectTimeout);
conf.setSocketTimeout(builder.socketTimeout);
conf.setMaxAttempts(builder.maxAttempts);
conf.setAttemptWaitTime(builder.attemptWaitTime);
+ this.conf = conf;
- this.httpClient = RetryableRestClient.getRestClient(baseUrls, conf);
+ this.httpClient = RetryableRestClient.getRestClient(this.baseUrls, conf);
switch (builder.authHeaderMethod) {
case BASIC:
@@ -89,6 +114,17 @@ public class KyuubiRestClient implements AutoCloseable {
}
}
+ private List<String> initBaseUrls(List<String> hostUrls, ApiVersion version) {
+ List<String> baseUrls = new LinkedList<>();
+ for (String hostUrl : hostUrls) {
+ // Remove the trailing "/" from the hostUrl if present
+ String baseUrl =
+ String.format("%s/%s", hostUrl.replaceAll("/$", ""), version.getApiNamespace());
+ baseUrls.add(baseUrl);
+ }
+ return baseUrls;
+ }
+
public String getAuthHeader() {
return authHeaderGenerator.generateAuthHeader();
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 09666f1c3..39e33861a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -56,9 +56,11 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
| - x2
| configs:
| spark.master: local
- | spark.${ENGINE_SPARK_MAX_LIFETIME.key}: "5000"
- | spark.${ENGINE_CHECK_INTERVAL.key}: "1000"
+ | wait.completion: true
+ | spark.${ENGINE_SPARK_MAX_LIFETIME.key}: 5000
+ | spark.${ENGINE_CHECK_INTERVAL.key}: 1000
| k1: v1
+ | 1: test_integer_key
|options:
| verbose: true""".stripMargin
Files.write(Paths.get(batchFile), batch_basic.getBytes(StandardCharsets.UTF_8))
@@ -115,7 +117,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
ldapUserPasswd)
result = testPrematureExitForControlCli(logArgs, "")
val rows = result.split("\n")
- assert(rows.length > 0)
+ assert(rows.length == 2)
val deleteArgs = Array(
"delete",
@@ -165,7 +167,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
"spnego")
result = testPrematureExitForControlCli(logArgs, "")
val rows = result.split("\n")
- assert(rows.length > 0)
+ assert(rows.length == 2)
val deleteArgs = Array(
"delete",
@@ -184,7 +186,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
batchFile,
"--password",
ldapUserPasswd)
- val result = testPrematureExitForControlCli(createArgs, "")
+ var result = testPrematureExitForControlCli(createArgs, "")
assert(result.contains("Type: SPARK"))
assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
val startIndex = result.indexOf("Id: ") + 4
@@ -202,7 +204,9 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
"--password",
ldapUserPasswd,
"--forward")
- testPrematureExitForControlCli(logArgs, s"Submitted application: ${appName}")
+ result = testPrematureExitForControlCli(logArgs, "")
+ assert(result.contains(s"Submitted application: ${appName}"))
+ assert(result.contains("ShutdownHookManager: Shutdown hook called"))
}
test("submit batch test") {
@@ -213,7 +217,26 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
batchFile,
"--password",
ldapUserPasswd)
- testPrematureExitForControlCli(submitArgs, s"Submitted application: ${appName}")
+ val result = testPrematureExitForControlCli(submitArgs, "")
+ assert(result.contains(s"Submitted application: ${appName}"))
+ assert(result.contains("ShutdownHookManager: Shutdown hook called"))
+ }
+
+ test("submit batch test with waitCompletion=false") {
+ val submitArgs = Array(
+ "submit",
+ "batch",
+ "-f",
+ batchFile,
+ "--password",
+ ldapUserPasswd,
+ "--waitCompletion",
+ "false")
+ val result = testPrematureExitForControlCli(submitArgs, "")
+ assert(result.contains(s"/bin/spark-submit"))
+ assert(!result.contains("ShutdownHookManager: Shutdown hook called"))
+ val numberOfRows = result.split("\n").length
+ assert(numberOfRows <= 100)
}
test("list batch test") {