You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/20 03:56:42 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c664c84f8 [KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch
c664c84f8 is described below

commit c664c84f80ef04de2956f36ae79d52d303613fed
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Mon Jun 20 11:56:36 2022 +0800

    [KYUUBI #2628][FOLLOWUP] Support waitCompletion for submit batch
    
    ### _Why are the changes needed?_
    
    Support waitCompletion for submit batch
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2909 from lightning-L/kyuubi-2628-1.
    
    Closes #2628
    
    23fe761f [Tianlin Liao] minor fix
    06cd66ff [Tianlin Liao] support waitUntilComplete for submit batch; use kyuubi instance url for log batch/submit batch
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 docs/deployment/settings.md                        |  1 +
 .../scala/org/apache/kyuubi/ctl/CliConfig.scala    |  3 +-
 .../scala/org/apache/kyuubi/ctl/CommandLine.scala  | 17 +++-
 .../main/scala/org/apache/kyuubi/ctl/CtlConf.scala |  7 ++
 .../org/apache/kyuubi/ctl/RestClientFactory.scala  | 12 +++
 .../scala/org/apache/kyuubi/ctl/cmd/Command.scala  | 28 -------
 .../kyuubi/ctl/cmd/create/CreateBatchCommand.scala | 11 ++-
 .../kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala |  6 +-
 .../kyuubi/ctl/cmd/log/LogBatchCommand.scala       | 91 +++++++++++++++-------
 .../kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala |  9 ++-
 .../org/apache/kyuubi/ctl/util/Validator.scala     |  6 +-
 .../apache/kyuubi/ctl/BatchCliArgumentsSuite.scala | 42 ++++++++--
 .../kyuubi/ctl/ControlCliArgumentsSuite.scala      |  5 ++
 .../org/apache/kyuubi/client/KyuubiRestClient.java | 54 ++++++++++---
 .../kyuubi/server/rest/client/BatchCliSuite.scala  | 37 +++++++--
 15 files changed, 233 insertions(+), 96 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 36980ceb5..8e2645cdf 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -187,6 +187,7 @@ kyuubi.credentials.update.wait.timeout|PT1M|How long to wait until credentials a
 
 Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
+kyuubi.ctl.batch.log.query.interval|PT3S|The interval for fetching batch logs.|duration|1.6.0
 kyuubi.ctl.rest.auth.schema|basic|The authentication schema. Valid values are: basic, spnego.|string|1.6.0
 kyuubi.ctl.rest.base.url|&lt;undefined&gt;|The REST API base URL, which contains the scheme (http:// or https://), host name, port number|string|1.6.0
 kyuubi.ctl.rest.request.attempt.wait|PT3S|How long to wait between attempts of ctl rest request.|duration|1.6.0
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
index 2b559d012..d5c8d49e5 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
@@ -65,7 +65,8 @@ case class BatchOpts(
     endTime: Long = 0,
     from: Int = -1,
     size: Int = 100,
-    hs2ProxyUser: String = null)
+    hs2ProxyUser: String = null,
+    waitCompletion: Boolean = true)
 
 case class EngineOpts(
     user: String = null,
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
index b1bef0224..2dc8be824 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
@@ -105,7 +105,7 @@ object CommandLine {
           opt[String]('f', "filename")
             .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
             .text("Filename to use to create the resource"),
-          batchCmd(builder).text("\tOpen batch session."),
+          createBatchCmd(builder).text("\tOpen batch session."),
           serverCmd(builder).text("\tExpose Kyuubi server instance to another domain.")))
   }
 
@@ -176,7 +176,7 @@ object CommandLine {
           opt[String]('f', "filename")
             .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
             .text("Filename to use to create the resource"),
-          batchCmd(builder).text("\topen batch session and wait for completion.")))
+          submitBatchCmd(builder).text("\topen batch session and wait for completion.")))
   }
 
   private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
@@ -202,7 +202,7 @@ object CommandLine {
           .text("The engine share level this engine belong to."))
   }
 
-  private def batchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+  private def createBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
     import builder._
     cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
   }
@@ -307,4 +307,15 @@ object CommandLine {
           .text("The max number of records returned in the query."))
   }
 
+  private def submitBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+      .children(
+        opt[Boolean]("waitCompletion")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(waitCompletion = v)))
+          .text("Boolean property. If true(default), the client process will stay alive " +
+            "until the batch is in any terminal state. If false, the client will exit " +
+            "when the batch is no longer in PENDING state."))
+  }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
index 9a57120a9..e09a89118 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
@@ -60,4 +60,11 @@ object CtlConf {
       .version("1.6.0")
       .timeConf
       .createWithDefault(Duration.ofSeconds(3).toMillis)
+
+  val CTL_BATCH_LOG_QUERY_INTERVAL =
+    buildConf("kyuubi.ctl.batch.log.query.interval")
+      .doc("The interval for fetching batch logs.")
+      .version("1.6.0")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(3).toMillis)
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
index f3fdf6223..a3acbbe0a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
@@ -40,6 +40,18 @@ object RestClientFactory {
     }
   }
 
+  private[ctl] def withKyuubiInstanceRestClient(
+      kyuubiRestClient: KyuubiRestClient,
+      kyuubiInstance: String)(f: KyuubiRestClient => Unit): Unit = {
+    val kyuubiInstanceRestClient = kyuubiRestClient.clone()
+    kyuubiInstanceRestClient.setHostUrls(s"http://${kyuubiInstance}")
+    try {
+      f(kyuubiInstanceRestClient)
+    } finally {
+      kyuubiInstanceRestClient.close()
+    }
+  }
+
   private def getKyuubiRestClient(
       cliConfig: CliConfig,
       map: JMap[String, Object],
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
index 168b789a7..7d0fc19a7 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
@@ -16,12 +16,6 @@
  */
 package org.apache.kyuubi.ctl.cmd
 
-import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-
-import org.yaml.snakeyaml.Yaml
-
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ctl.CliConfig
@@ -30,8 +24,6 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
 
 abstract class Command[T](cliConfig: CliConfig) extends Logging {
 
-  protected val DEFAULT_LOG_QUERY_INTERVAL: Int = 1000
-
   val conf = KyuubiConf().loadFileDefaults()
 
   cliConfig.conf.foreach { case (key, value) =>
@@ -91,26 +83,6 @@ abstract class Command[T](cliConfig: CliConfig) extends Logging {
     arguments
   }
 
-  private[ctl] def readConfig(): HashMap[String, Object] = {
-    var filename = normalizedCliConfig.createOpts.filename
-
-    var map: HashMap[String, Object] = null
-    var br: BufferedReader = null
-    try {
-      val yaml = new Yaml()
-      val input = new FileInputStream(new File(filename))
-      br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
-      map = yaml.load(br).asInstanceOf[HashMap[String, Object]]
-    } catch {
-      case e: Exception => fail(s"Failed to read yaml file[$filename]: $e")
-    } finally {
-      if (br != null) {
-        br.close()
-      }
-    }
-    map
-  }
-
   override def info(msg: => Any): Unit = printMessage(msg)
 
   override def warn(msg: => Any): Unit = printMessage(s"Warning: $msg")
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
index 01c48b50e..90a375355 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
@@ -16,7 +16,10 @@
  */
 package org.apache.kyuubi.ctl.cmd.create
 
-import java.util.{ArrayList, HashMap}
+import java.util.{Map => JMap}
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.client.BatchRestApi
 import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
@@ -37,13 +40,15 @@ class CreateBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
     withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
       val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
 
-      val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+      val request = map.get("request").asInstanceOf[JMap[String, Object]]
+      val config = request.get("configs").asInstanceOf[JMap[Object, Object]].asScala
+        .map { case (k, v) => (k.toString, v.toString) }.asJava
       val batchRequest = new BatchRequest(
         map.get("batchType").asInstanceOf[String],
         request.get("resource").asInstanceOf[String],
         request.get("className").asInstanceOf[String],
         request.get("name").asInstanceOf[String],
-        request.get("configs").asInstanceOf[HashMap[String, String]],
+        config,
         request.get("args").asInstanceOf[ArrayList[String]])
 
       batchRestApi.createBatch(batchRequest)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
index 6f664d334..24f014d7e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
@@ -25,7 +25,11 @@ import org.apache.kyuubi.ctl.cmd.Command
 import org.apache.kyuubi.ctl.util.BatchUtil
 
 class DeleteBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig) {
-  def validate(): Unit = {}
+  def validate(): Unit = {
+    if (normalizedCliConfig.batchOpts.batchId == null) {
+      fail("Must specify batchId for delete batch command.")
+    }
+  }
 
   def doRun(): Batch = {
     withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
index 0a23e028a..daea4f399 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
@@ -22,13 +22,16 @@ import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.client.BatchRestApi
 import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
-import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.ctl.CliConfig
-import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.CtlConf._
+import org.apache.kyuubi.ctl.RestClientFactory.{withKyuubiInstanceRestClient, withKyuubiRestClient}
 import org.apache.kyuubi.ctl.cmd.Command
 import org.apache.kyuubi.ctl.util.{BatchUtil, Render}
 
-class LogBatchCommand(cliConfig: CliConfig, restConfigMap: JMap[String, Object] = null)
+class LogBatchCommand(
+    cliConfig: CliConfig,
+    batch: Option[Batch] = None,
+    restConfigMap: JMap[String, Object] = null)
   extends Command[Batch](cliConfig) {
 
   def validate(): Unit = {
@@ -46,41 +49,73 @@ class LogBatchCommand(cliConfig: CliConfig, restConfigMap: JMap[String, Object]
 
       var log: OperationLog = null
       var done = false
-      var batch: Batch = null
+      var batch = this.batch.getOrElse(batchRestApi.getBatchById(batchId))
+      val kyuubiInstance = batch.getKyuubiInstance
 
-      while (!done) {
-        try {
-          log = batchRestApi.getBatchLocalLog(
-            batchId,
-            from,
-            size)
-          from += log.getLogRowSet.size
-          log.getLogRowSet.asScala.foreach(x => info(x))
-          if (!normalizedCliConfig.logOpts.forward) {
-            done = true
-          }
-        } catch {
-          case e: KyuubiRestException =>
-            error(s"Error fetching batch logs: ${e.getMessage}")
-        }
+      withKyuubiInstanceRestClient(kyuubiRestClient, kyuubiInstance) { kyuubiInstanceRestClient =>
+        val kyuubiInstanceBatchRestApi: BatchRestApi = new BatchRestApi(kyuubiInstanceRestClient)
+        while (!done) {
+          try {
+            log = kyuubiInstanceBatchRestApi.getBatchLocalLog(
+              batchId,
+              from,
+              size)
+            from += log.getLogRowSet.size
+            log.getLogRowSet.asScala.foreach(x => info(x))
 
-        if (log == null || log.getLogRowSet.size() == 0) {
-          batch = batchRestApi.getBatchById(batchId)
-          if (BatchUtil.isTerminalState(batch.getState)) {
-            done = true
+            val (latestBatch, shouldFinishLog) =
+              checkStatus(kyuubiInstanceBatchRestApi, batchId, log)
+            batch = latestBatch
+            done = shouldFinishLog
+          } catch {
+            case e: Exception =>
+              val (latestBatch, shouldFinishLog) = checkStatus(batchRestApi, batchId, log)
+              batch = latestBatch
+              done = shouldFinishLog
+              if (done) {
+                error(s"Error fetching batch logs: ${e.getMessage}")
+              }
           }
-        }
 
-        if (!done) {
-          Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+          if (!done) {
+            Thread.sleep(conf.get(CTL_BATCH_LOG_QUERY_INTERVAL).toInt)
+          }
         }
       }
-
       batch
     }
   }
 
   def render(batch: Batch): Unit = {
-    info(Render.renderBatchInfo(batch))
+    if (normalizedCliConfig.logOpts.forward) {
+      info(Render.renderBatchInfo(batch))
+    }
+  }
+
+  private def checkStatus(
+      batchRestApi: BatchRestApi,
+      batchId: String,
+      log: OperationLog): (Batch, Boolean) = {
+    var batch: Batch = null
+
+    if (!normalizedCliConfig.logOpts.forward) {
+      return (batch, true)
+    }
+
+    if (normalizedCliConfig.batchOpts.waitCompletion) {
+      if (log == null || log.getLogRowSet.size == 0) {
+        batch = batchRestApi.getBatchById(batchId)
+        if (BatchUtil.isTerminalState(batch.getState)) {
+          return (batch, true)
+        }
+      }
+    } else {
+      batch = batchRestApi.getBatchById(batchId)
+      if (!BatchUtil.isPendingState(batch.getState)) {
+        return (batch, true)
+      }
+    }
+
+    (batch, false)
   }
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
index 119553fe9..b81dac646 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
@@ -34,14 +34,17 @@ class SubmitBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
     val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
 
     val createBatchCommand = new CreateBatchCommand(normalizedCliConfig)
-    val batchId = createBatchCommand.doRun().getId
+    var batch = createBatchCommand.doRun()
 
     val logBatchCommand = new LogBatchCommand(
       normalizedCliConfig.copy(
-        batchOpts = BatchOpts(batchId = batchId),
+        batchOpts = BatchOpts(
+          batchId = batch.getId,
+          waitCompletion = normalizedCliConfig.batchOpts.waitCompletion),
         logOpts = LogOpts(forward = true)),
+      Some(batch),
       map)
-    val batch = logBatchCommand.doRun()
+    batch = logBatchCommand.doRun()
 
     if (BatchUtil.isTerminalState(batch.getState) && !BatchUtil.isFinishedState(batch.getState)) {
       error(s"Batch ${batch.getId} failed: ${JsonUtil.toJson(batch)}")
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
index 67fe497e1..738cdc2e3 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
@@ -62,15 +62,11 @@ private[ctl] object Validator {
   }
 
   def validateFilename(cliConfig: CliConfig): Unit = {
-    var filename = cliConfig.createOpts.filename
+    val filename = cliConfig.createOpts.filename
     if (StringUtils.isBlank(filename)) {
       fail(s"Config file is not specified.")
     }
 
-    val currentPath = Paths.get("").toAbsolutePath
-    if (!filename.startsWith("/")) {
-      filename = currentPath + "/" + filename
-    }
     if (!Files.exists(Paths.get(filename))) {
       fail(s"Config file does not exist: ${filename}.")
     }
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
index d0d41a0a4..a958a1b6e 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
@@ -73,20 +73,46 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
     }
   }
 
-  test("get batch without batch id specified") {
+  test("submit batch default option") {
     val args = Array(
-      "get",
-      "batch")
-    testPrematureExitForControlCliArgs(args, "Must specify batchId for get batch command")
+      "submit",
+      "batch",
+      "-f",
+      "src/test/resources/cli/batch.yaml")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.waitCompletion == true)
   }
 
-  test("get batch") {
+  test("submit batch without waitForCompletion") {
     val args = Array(
-      "get",
+      "submit",
       "batch",
-      "f7fd702c-e54e-11ec-8fea-0242ac120002")
+      "-f",
+      "src/test/resources/cli/batch.yaml",
+      "--waitCompletion",
+      "false")
     val opArgs = new ControlCliArguments(args)
-    assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    assert(opArgs.cliConfig.batchOpts.waitCompletion == false)
+  }
+
+  test("get/delete batch") {
+    Seq("get", "delete").foreach { op =>
+      val args = Seq(
+        op,
+        "batch",
+        "f7fd702c-e54e-11ec-8fea-0242ac120002")
+      val opArgs = new ControlCliArguments(args)
+      assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    }
+  }
+
+  test("get/delete batch without batch id specified") {
+    Seq("get", "delete").foreach { op =>
+      val args = Array(
+        op,
+        "batch")
+      testPrematureExitForControlCliArgs(args, s"Must specify batchId for ${op} batch command")
+    }
   }
 
   test("test list batch option") {
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
index 11804d999..17aa31d12 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
@@ -337,6 +337,9 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
     val zkHelpString = "The connection string for the zookeeper ensemble, using zk quorum manually."
     val versionHelpString = "Using the compiled KYUUBI_VERSION default," +
       " change it if the active service is running in another."
+    val waitBatchCompletionHelpString = "Boolean property. If true(default), the client process " +
+      "will stay alive until the batch is in any terminal state. If false, the client will exit " +
+      "when the batch is no longer in PENDING state."
     val helpString =
       s"""kyuubi $KYUUBI_VERSION
          |Usage: kyuubi-ctl [create|get|delete|list|log|submit] [options] <args>...
@@ -435,6 +438,8 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
          |  -f, --filename <value>   Filename to use to create the resource
          |Command: submit batch
          |${"\t"}open batch session and wait for completion.
+         |  --waitCompletion <value>
+         |                           ${waitBatchCompletionHelpString}
          |
          |  -h, --help               Show help message and exit.""".stripMargin
 
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
index 309e3dbec..93857297f 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/KyuubiRestClient.java
@@ -24,10 +24,16 @@ import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kyuubi.client.auth.*;
 
-public class KyuubiRestClient implements AutoCloseable {
+public class KyuubiRestClient implements AutoCloseable, Cloneable {
 
   private IRestClient httpClient;
 
+  private RestClientConf conf;
+
+  private List<String> baseUrls;
+
+  private ApiVersion version;
+
   private AuthHeaderGenerator authHeaderGenerator;
 
   /** Specifies the version of the Kyuubi API to communicate with. */
@@ -52,24 +58,43 @@ public class KyuubiRestClient implements AutoCloseable {
     }
   }
 
+  @Override
+  public KyuubiRestClient clone() {
+    KyuubiRestClient kyuubiRestClient = new KyuubiRestClient();
+    kyuubiRestClient.version = this.version;
+    kyuubiRestClient.conf = this.conf;
+    kyuubiRestClient.baseUrls = this.baseUrls;
+    kyuubiRestClient.httpClient = RetryableRestClient.getRestClient(this.baseUrls, this.conf);
+    kyuubiRestClient.authHeaderGenerator = this.authHeaderGenerator;
+    return kyuubiRestClient;
+  }
+
+  public void setHostUrls(String... hostUrls) {
+    setHostUrls(Arrays.asList(hostUrls));
+  }
+
+  public void setHostUrls(List<String> hostUrls) {
+    if (hostUrls.isEmpty()) {
+      throw new IllegalArgumentException("hostUrls cannot be blank.");
+    }
+    List<String> baseUrls = initBaseUrls(hostUrls, version);
+    this.httpClient = RetryableRestClient.getRestClient(baseUrls, this.conf);
+  }
+
   private KyuubiRestClient() {}
 
   private KyuubiRestClient(Builder builder) {
-    List<String> baseUrls = new LinkedList<>();
-    for (String hostUrl : builder.hostUrls) {
-      // Remove the trailing "/" from the hostUrl if present
-      String baseUrl =
-          String.format("%s/%s", hostUrl.replaceAll("/$", ""), builder.version.getApiNamespace());
-      baseUrls.add(baseUrl);
-    }
+    this.version = builder.version;
+    this.baseUrls = initBaseUrls(builder.hostUrls, builder.version);
 
     RestClientConf conf = new RestClientConf();
     conf.setConnectTimeout(builder.connectTimeout);
     conf.setSocketTimeout(builder.socketTimeout);
     conf.setMaxAttempts(builder.maxAttempts);
     conf.setAttemptWaitTime(builder.attemptWaitTime);
+    this.conf = conf;
 
-    this.httpClient = RetryableRestClient.getRestClient(baseUrls, conf);
+    this.httpClient = RetryableRestClient.getRestClient(this.baseUrls, conf);
 
     switch (builder.authHeaderMethod) {
       case BASIC:
@@ -89,6 +114,17 @@ public class KyuubiRestClient implements AutoCloseable {
     }
   }
 
+  private List<String> initBaseUrls(List<String> hostUrls, ApiVersion version) {
+    List<String> baseUrls = new LinkedList<>();
+    for (String hostUrl : hostUrls) {
+      // Remove the trailing "/" from the hostUrl if present
+      String baseUrl =
+          String.format("%s/%s", hostUrl.replaceAll("/$", ""), version.getApiNamespace());
+      baseUrls.add(baseUrl);
+    }
+    return baseUrls;
+  }
+
   public String getAuthHeader() {
     return authHeaderGenerator.generateAuthHeader();
   }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 09666f1c3..39e33861a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -56,9 +56,11 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
                          |   - x2
                          |  configs:
                          |    spark.master: local
-                         |    spark.${ENGINE_SPARK_MAX_LIFETIME.key}: "5000"
-                         |    spark.${ENGINE_CHECK_INTERVAL.key}: "1000"
+                         |    wait.completion: true
+                         |    spark.${ENGINE_SPARK_MAX_LIFETIME.key}: 5000
+                         |    spark.${ENGINE_CHECK_INTERVAL.key}: 1000
                          |    k1: v1
+                         |    1: test_integer_key
                          |options:
                          |  verbose: true""".stripMargin
     Files.write(Paths.get(batchFile), batch_basic.getBytes(StandardCharsets.UTF_8))
@@ -115,7 +117,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
       ldapUserPasswd)
     result = testPrematureExitForControlCli(logArgs, "")
     val rows = result.split("\n")
-    assert(rows.length > 0)
+    assert(rows.length == 2)
 
     val deleteArgs = Array(
       "delete",
@@ -165,7 +167,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
       "spnego")
     result = testPrematureExitForControlCli(logArgs, "")
     val rows = result.split("\n")
-    assert(rows.length > 0)
+    assert(rows.length == 2)
 
     val deleteArgs = Array(
       "delete",
@@ -184,7 +186,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
       batchFile,
       "--password",
       ldapUserPasswd)
-    val result = testPrematureExitForControlCli(createArgs, "")
+    var result = testPrematureExitForControlCli(createArgs, "")
     assert(result.contains("Type: SPARK"))
     assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
     val startIndex = result.indexOf("Id: ") + 4
@@ -202,7 +204,9 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
       "--password",
       ldapUserPasswd,
       "--forward")
-    testPrematureExitForControlCli(logArgs, s"Submitted application: ${appName}")
+    result = testPrematureExitForControlCli(logArgs, "")
+    assert(result.contains(s"Submitted application: ${appName}"))
+    assert(result.contains("ShutdownHookManager: Shutdown hook called"))
   }
 
   test("submit batch test") {
@@ -213,7 +217,26 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
       batchFile,
       "--password",
       ldapUserPasswd)
-    testPrematureExitForControlCli(submitArgs, s"Submitted application: ${appName}")
+    val result = testPrematureExitForControlCli(submitArgs, "")
+    assert(result.contains(s"Submitted application: ${appName}"))
+    assert(result.contains("ShutdownHookManager: Shutdown hook called"))
+  }
+
+  test("submit batch test with waitCompletion=false") {
+    val submitArgs = Array(
+      "submit",
+      "batch",
+      "-f",
+      batchFile,
+      "--password",
+      ldapUserPasswd,
+      "--waitCompletion",
+      "false")
+    val result = testPrematureExitForControlCli(submitArgs, "")
+    assert(result.contains(s"/bin/spark-submit"))
+    assert(!result.contains("ShutdownHookManager: Shutdown hook called"))
+    val numberOfRows = result.split("\n").length
+    assert(numberOfRows <= 100)
   }
 
   test("list batch test") {