You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/14 03:19:27 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new cb4833859 [KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation
cb4833859 is described below

commit cb48338591686cfd7b6f8ea47c2b309995fec728
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Tue Jun 14 11:19:18 2022 +0800

    [KYUUBI #2628][SUB-TASK][KPIP-4] Implement kyuubi-ctl for batch job operation
    
    ### _Why are the changes needed?_
    
    To close #2628
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2823 from lightning-L/kyuubi-2628.
    
    Closes #2628
    
    3af66099 [Tianlin Liao] refactor
    19ca40fe [Tianlin Liao] fix list/log batch
    d0fc0a9d [Tianlin Liao] refactor RestClientFactory
    617732c8 [Tianlin Liao] refactor command
    897afd38 [Tianlin Liao] rename ClientFactory to RestClientFactory
    9ed61166 [Tianlin Liao] add list batch option
    953e7169 [Tianlin Liao] refactor kyuubi-ctl conf
    ad2a9b6f [Tianlin Liao] fix license and style
    60943eb2 [Tianlin Liao] fix delete/list operation after rebase
    97cbe9df [Tianlin Liao] add kyuubi rest config: hostUrl, spnegoHost, authSchema
    c46e8f7a [Tianlin Liao] implement create/get/delete/list/log/submit batch command
    71c9cb9a [Tianlin Liao] reorganize cmd folder and rename command class
    cf7478bb [Tianlin Liao] use reflect to generate command object
    e985c7bb [Tianlin Liao] rename ServiceControlXXX to ControlXXX
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 bin/kyuubi-ctl                                     |   4 +-
 docs/deployment/settings.md                        |   9 +
 kyuubi-ctl/pom.xml                                 |  28 ++
 .../scala/org/apache/kyuubi/ctl/CliConfig.scala    |  39 ++-
 .../scala/org/apache/kyuubi/ctl/CommandLine.scala  | 186 ++++++++++++-
 .../scala/org/apache/kyuubi/ctl/ControlCli.scala   |   5 +-
 .../apache/kyuubi/ctl/ControlCliArguments.scala    | 108 +++++---
 .../main/scala/org/apache/kyuubi/ctl/CtlConf.scala |  48 ++++
 .../org/apache/kyuubi/ctl/RestClientFactory.scala  | 115 ++++++++
 .../scala/org/apache/kyuubi/ctl/cmd/Command.scala  | 120 +++------
 .../org/apache/kyuubi/ctl/cmd/ListCommand.scala    |  53 ----
 .../kyuubi/ctl/cmd/create/CreateBatchCommand.scala |  54 ++++
 .../CreateServerCommand.scala}                     |  34 +--
 .../delete/DeleteBatchCommand.scala}               |  31 ++-
 .../ctl/cmd/{ => delete}/DeleteCommand.scala       |  27 +-
 .../delete/DeleteEngineCommand.scala}              |  20 +-
 .../delete/DeleteServerCommand.scala}              |  18 +-
 .../kyuubi/ctl/cmd/get/GetBatchCommand.scala       |  43 +++
 .../kyuubi/ctl/cmd/{ => get}/GetCommand.scala      |  32 +--
 .../get/GetEngineCommand.scala}                    |  20 +-
 .../get/GetServerCommand.scala}                    |  18 +-
 .../kyuubi/ctl/cmd/list/ListBatchCommand.scala     |  58 ++++
 .../{Render.scala => cmd/list/ListCommand.scala}   |  28 +-
 .../list/ListEngineCommand.scala}                  |  20 +-
 .../list/ListServerCommand.scala}                  |  18 +-
 .../kyuubi/ctl/cmd/log/LogBatchCommand.scala       |  71 +++++
 .../kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala |  76 ++++++
 .../{Render.scala => util/CommandLineUtils.scala}  |  33 ++-
 .../org/apache/kyuubi/ctl/util/CtlUtils.scala      | 110 ++++++++
 .../ctl/{Render.scala => util/DateTimeUtils.scala} |  28 +-
 .../scala/org/apache/kyuubi/ctl/util/Render.scala  |  70 +++++
 .../Tabulator.scala}                               |  24 +-
 .../org/apache/kyuubi/ctl/util/Validator.scala     |  80 ++++++
 kyuubi-ctl/src/test/resources/cli/batch.yaml       |  32 +++
 .../apache/kyuubi/ctl/BatchCliArgumentsSuite.scala | 253 ++++++++++++++++++
 .../kyuubi/ctl/ControlCliArgumentsSuite.scala      | 177 +++++++------
 .../org/apache/kyuubi/ctl/ControlCliSuite.scala    | 110 +++-----
 .../org/apache/kyuubi/ctl/TestPrematureExit.scala  | 102 +++++++
 .../org/apache/kyuubi/client/BatchRestApi.java     |   4 +-
 kyuubi-server/pom.xml                              |  15 ++
 .../kyuubi/config/AllKyuubiConfiguration.scala     |   4 +-
 .../kyuubi/server/rest/client/BatchCliSuite.scala  | 294 +++++++++++++++++++++
 pom.xml                                            |   1 +
 43 files changed, 2065 insertions(+), 555 deletions(-)

diff --git a/bin/kyuubi-ctl b/bin/kyuubi-ctl
index 3ff03ddff..d989eeb2d 100755
--- a/bin/kyuubi-ctl
+++ b/bin/kyuubi-ctl
@@ -16,8 +16,8 @@
 # limitations under the License.
 #
 
-## Kyuubi Service Control Client Entrance
-CLASS="org.apache.kyuubi.ctl.ServiceControlCli"
+## Kyuubi Control Client Entrance
+CLASS="org.apache.kyuubi.ctl.ControlCli"
 
 export KYUUBI_HOME="$(cd "$(dirname "$0")"/..; pwd)"
 
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 3650caa5d..54285fd86 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -183,6 +183,15 @@ kyuubi.credentials.renewal.retry.wait|PT1M|How long to wait before retrying to f
 kyuubi.credentials.update.wait.timeout|PT1M|How long to wait until credentials are ready.|duration|1.5.0
 
 
+### Ctl
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+kyuubi.ctl.rest.auth.schema|basic|The authentication schema. Valid values are: basic, spnego.|string|1.6.0
+kyuubi.ctl.rest.base.url|&lt;undefined&gt;|The REST API base URL, which contains the scheme (http:// or https://), host name, port number|string|1.6.0
+kyuubi.ctl.rest.spnego.host|&lt;undefined&gt;|When auth schema is spnego, need to config spnego host.|string|1.6.0
+
+
 ### Delegation
 
 Key | Default | Meaning | Type | Since
diff --git a/kyuubi-ctl/pom.xml b/kyuubi-ctl/pom.xml
index c1235c645..984f8661e 100644
--- a/kyuubi-ctl/pom.xml
+++ b/kyuubi-ctl/pom.xml
@@ -44,6 +44,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-rest-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client-api</artifactId>
@@ -71,6 +77,12 @@
             <artifactId>scopt_${scala.binary.version}</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${snakeyaml.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
@@ -113,5 +125,21 @@
     <build>
         <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
         <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>prepare-test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
     </build>
 </project>
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
index 1fc04b33b..19542aa6f 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CliConfig.scala
@@ -16,23 +16,26 @@
  */
 package org.apache.kyuubi.ctl
 
-import org.apache.kyuubi.ctl.ServiceControlAction.ControlAction
-import org.apache.kyuubi.ctl.ServiceControlObject.ControlObject
+import org.apache.kyuubi.ctl.ControlAction.ControlAction
+import org.apache.kyuubi.ctl.ControlObject.ControlObject
 
-private[ctl] object ServiceControlAction extends Enumeration {
+private[ctl] object ControlAction extends Enumeration {
   type ControlAction = Value
-  val CREATE, GET, DELETE, LIST = Value
+  val CREATE, GET, DELETE, LIST, LOG, SUBMIT = Value
 }
 
-private[ctl] object ServiceControlObject extends Enumeration {
+private[ctl] object ControlObject extends Enumeration {
   type ControlObject = Value
-  val SERVER, ENGINE = Value
+  val SERVER, ENGINE, BATCH = Value
 }
 
 case class CliConfig(
     action: ControlAction = null,
-    service: ControlObject = ServiceControlObject.SERVER,
+    resource: ControlObject = ControlObject.SERVER,
     commonOpts: CommonOpts = CommonOpts(),
+    createOpts: CreateOpts = CreateOpts(),
+    logOpts: LogOpts = LogOpts(),
+    batchOpts: BatchOpts = BatchOpts(),
     engineOpts: EngineOpts = EngineOpts())
 
 case class CommonOpts(
@@ -41,7 +44,27 @@ case class CommonOpts(
     host: String = null,
     port: String = null,
     version: String = null,
-    verbose: Boolean = false)
+    verbose: Boolean = false,
+    hostUrl: String = null,
+    authSchema: String = null,
+    username: String = null,
+    password: String = null,
+    spnegoHost: String = null)
+
+case class CreateOpts(filename: String = null)
+
+case class LogOpts(forward: Boolean = false)
+
+case class BatchOpts(
+    batchId: String = null,
+    batchType: String = null,
+    batchUser: String = null,
+    batchState: String = null,
+    createTime: Long = 0,
+    endTime: Long = 0,
+    from: Int = -1,
+    size: Int = 100,
+    hs2ProxyUser: String = null)
 
 case class EngineOpts(
     user: String = null,
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
index 903c6cd48..cc3fb7a62 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLine.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.ctl
 import scopt.{OParser, OParserBuilder}
 
 import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
 
 object CommandLine {
 
@@ -32,9 +33,14 @@ object CommandLine {
       get(builder),
       delete(builder),
       list(builder),
+      log(builder),
+      submit(builder),
       checkConfig(f => {
-        if (f.action == null) failure("Must specify action command: [create|get|delete|list].")
-        else success
+        if (f.action == null) {
+          failure("Must specify action command: [create|get|delete|list|log|submit].")
+        } else {
+          success
+        }
       }),
       note(""),
       help('h', "help").text("Show help message and exit."))
@@ -62,7 +68,22 @@ object CommandLine {
           " change it if the active service is running in another."),
       opt[Unit]('b', "verbose")
         .action((_, c) => c.copy(commonOpts = c.commonOpts.copy(verbose = true)))
-        .text("Print additional debug output."))
+        .text("Print additional debug output."),
+      opt[String]("hostUrl")
+        .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(hostUrl = v)))
+        .text("Host url for rest api."),
+      opt[String]("authSchema")
+        .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(authSchema = v)))
+        .text("Auth schema for rest api, valid values are basic, spnego."),
+      opt[String]("username")
+        .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(username = v)))
+        .text("Username for basic authentication."),
+      opt[String]("password")
+        .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(password = v)))
+        .text("Password for basic authentication."),
+      opt[String]("spnegoHost")
+        .action((v, c) => c.copy(commonOpts = c.commonOpts.copy(spnegoHost = v)))
+        .text("Spnego host for spnego authentication."))
   }
 
   private def create(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
@@ -70,8 +91,13 @@ object CommandLine {
     OParser.sequence(
       note(""),
       cmd("create")
-        .action((_, c) => c.copy(action = ServiceControlAction.CREATE))
+        .text("\tCreate a resource.")
+        .action((_, c) => c.copy(action = ControlAction.CREATE))
         .children(
+          opt[String]('f', "filename")
+            .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+            .text("Filename to use to create the resource"),
+          batchCmd(builder).text("\tOpen batch session."),
           serverCmd(builder).text("\tExpose Kyuubi server instance to another domain.")))
   }
 
@@ -80,9 +106,10 @@ object CommandLine {
     OParser.sequence(
       note(""),
       cmd("get")
-        .text("\tGet the service/engine node info, host and port needed.")
-        .action((_, c) => c.copy(action = ServiceControlAction.GET))
+        .text("\tDisplay information about the specified resources.")
+        .action((_, c) => c.copy(action = ControlAction.GET))
         .children(
+          getBatchCmd(builder).text("\tGet batch by id."),
           serverCmd(builder).text("\tGet Kyuubi server info of domain"),
           engineCmd(builder).text("\tGet Kyuubi engine info belong to a user.")))
 
@@ -93,9 +120,10 @@ object CommandLine {
     OParser.sequence(
       note(""),
       cmd("delete")
-        .text("\tDelete the specified service/engine node, host and port needed.")
-        .action((_, c) => c.copy(action = ServiceControlAction.DELETE))
+        .text("\tDelete resources.")
+        .action((_, c) => c.copy(action = ControlAction.DELETE))
         .children(
+          deleteBatchCmd(builder).text("\tClose batch session."),
           serverCmd(builder).text("\tDelete the specified service node for a domain"),
           engineCmd(builder).text("\tDelete the specified engine node for user.")))
 
@@ -106,22 +134,51 @@ object CommandLine {
     OParser.sequence(
       note(""),
       cmd("list")
-        .text("\tList all the service/engine nodes for a particular domain.")
-        .action((_, c) => c.copy(action = ServiceControlAction.LIST))
+        .text("\tList information about resources.")
+        .action((_, c) => c.copy(action = ControlAction.LIST))
         .children(
+          listBatchCmd(builder).text("\tList batch session info."),
           serverCmd(builder).text("\tList all the service nodes for a particular domain"),
           engineCmd(builder).text("\tList all the engine nodes for a user")))
 
   }
 
+  private def log(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    OParser.sequence(
+      note(""),
+      cmd("log")
+        .text("\tPrint the logs for specified resource.")
+        .action((_, c) => c.copy(action = ControlAction.LOG))
+        .children(
+          opt[Unit]("forward")
+            .action((_, c) => c.copy(logOpts = c.logOpts.copy(forward = true)))
+            .text("If forward is specified, the ctl will block forever."),
+          logBatchCmd(builder).text("\tGet batch session local log.")))
+  }
+
+  private def submit(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    OParser.sequence(
+      note(""),
+      cmd("submit")
+        .text("\tCombination of create, get and log commands.")
+        .action((_, c) => c.copy(action = ControlAction.SUBMIT))
+        .children(
+          opt[String]('f', "filename")
+            .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+            .text("Filename to use to create the resource"),
+          batchCmd(builder).text("\topen batch session and wait for completion.")))
+  }
+
   private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
     import builder._
-    cmd("server").action((_, c) => c.copy(service = ServiceControlObject.SERVER))
+    cmd("server").action((_, c) => c.copy(resource = ControlObject.SERVER))
   }
 
   private def engineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
     import builder._
-    cmd("engine").action((_, c) => c.copy(service = ServiceControlObject.ENGINE))
+    cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
       .children(
         opt[String]('u', "user")
           .action((v, c) => c.copy(engineOpts = c.engineOpts.copy(user = v)))
@@ -137,4 +194,109 @@ object CommandLine {
           .text("The engine share level this engine belong to."))
   }
 
+  private def batchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+  }
+
+  private def getBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+      .children(
+        arg[String]("<batchId>")
+          .optional()
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+          .text("Batch id."))
+  }
+
+  private def deleteBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+      .children(
+        arg[String]("<batchId>")
+          .optional()
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+          .text("Batch id."),
+        opt[String]("hs2ProxyUser")
+          .action((v, c) => c.copy(createOpts = c.createOpts.copy(filename = v)))
+          .text("The value of hive.server2.proxy.user config."))
+  }
+
+  private def listBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+      .children(
+        opt[String]("batchType")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchType = v)))
+          .text("Batch type."),
+        opt[String]("batchUser")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchUser = v)))
+          .text("Batch user."),
+        opt[String]("batchState")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchState = v)))
+          .text("Batch state."),
+        opt[String]("createTime")
+          .action((v, c) =>
+            c.copy(batchOpts = c.batchOpts.copy(createTime =
+              dateStringToMillis(v, "yyyyMMddHHmmss"))))
+          .validate(x =>
+            if (x.matches("\\d{14}")) {
+              success
+            } else {
+              failure("Option --createTime must be in yyyyMMddHHmmss format.")
+            })
+          .text("Batch create time, should be in yyyyMMddHHmmss format."),
+        opt[String]("endTime")
+          .action((v, c) =>
+            c.copy(batchOpts = c.batchOpts.copy(endTime =
+              dateStringToMillis(v, "yyyyMMddHHmmss"))))
+          .validate(x =>
+            if (x.matches("\\d{14}")) {
+              success
+            } else {
+              failure("Option --endTime must be in yyyyMMddHHmmss format.")
+            })
+          .text("Batch end time, should be in yyyyMMddHHmmss format."),
+        opt[Int]("from")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(from = v)))
+          .validate(x =>
+            if (x >= 0) {
+              success
+            } else {
+              failure("Option --from must be >=0")
+            })
+          .text("Specify which record to start from retrieving info."),
+        opt[Int]("size")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(size = v)))
+          .validate(x =>
+            if (x >= 0) {
+              success
+            } else {
+              failure("Option --size must be >=0")
+            })
+          .text("The max number of records returned in the query."))
+  }
+
+  private def logBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
+    import builder._
+    cmd("batch").action((_, c) => c.copy(resource = ControlObject.BATCH))
+      .children(
+        arg[String]("<batchId>")
+          .optional()
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchId = v)))
+          .text("Batch id."),
+        opt[Int]("from")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(from = v)))
+          .text("Specify which record to start from retrieving info."),
+        opt[Int]("size")
+          .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(size = v)))
+          .validate(x =>
+            if (x >= 0) {
+              success
+            } else {
+              failure("Option --size must be >=0")
+            })
+          .text("The max number of records returned in the query."))
+  }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
index ab87568b4..93b87c981 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCli.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.ctl
 
 import org.apache.kyuubi.Logging
+import org.apache.kyuubi.ctl.util.CommandLineUtils
 
 /**
  * Main gateway of launching a Kyuubi Ctl action.
@@ -32,11 +33,11 @@ private[kyuubi] class ControlCli extends Logging {
     val ctlArgs = parseArguments(args)
 
     // when parse failed, exit
-    if (ctlArgs.cliArgs == null) {
+    if (ctlArgs.cliConfig == null) {
       sys.exit(1)
     }
 
-    val verbose = ctlArgs.cliArgs.commonOpts.verbose
+    val verbose = ctlArgs.cliConfig.commonOpts.verbose
     if (verbose) {
       super.info(ctlArgs.toString)
     }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
index 29d4724d5..3d90b60b9 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ControlCliArguments.scala
@@ -19,13 +19,19 @@ package org.apache.kyuubi.ctl
 
 import scopt.OParser
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.ctl.cmd._
+import org.apache.kyuubi.ctl.cmd.create.{CreateBatchCommand, CreateServerCommand}
+import org.apache.kyuubi.ctl.cmd.delete.{DeleteBatchCommand, DeleteEngineCommand, DeleteServerCommand}
+import org.apache.kyuubi.ctl.cmd.get.{GetBatchCommand, GetEngineCommand, GetServerCommand}
+import org.apache.kyuubi.ctl.cmd.list.{ListBatchCommand, ListEngineCommand, ListServerCommand}
+import org.apache.kyuubi.ctl.cmd.log.LogBatchCommand
+import org.apache.kyuubi.ctl.cmd.submit.SubmitBatchCommand
 
 class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
   extends ControlCliArgumentsParser with Logging {
 
-  var cliArgs: CliConfig = null
+  var cliConfig: CliConfig = null
 
   var command: Command = null
 
@@ -48,48 +54,88 @@ class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
         result match {
           case Some(arguments) =>
             command = getCommand(arguments)
-            command.preProcess()
-            cliArgs = command.cliArgs
+            command.validate()
+            cliConfig = command.normalizedCliConfig
           case _ =>
           // arguments are bad, exit
         }
     }
   }
 
-  private def getCommand(cliArgs: CliConfig): Command = {
-    cliArgs.action match {
-      case ServiceControlAction.CREATE => new CreateCommand(cliArgs)
-      case ServiceControlAction.GET => new GetCommand(cliArgs)
-      case ServiceControlAction.DELETE => new DeleteCommand(cliArgs)
-      case ServiceControlAction.LIST => new ListCommand(cliArgs)
-      case _ => null
+  private def getCommand(cliConfig: CliConfig): Command = {
+    cliConfig.action match {
+      case ControlAction.CREATE => cliConfig.resource match {
+          case ControlObject.BATCH => new CreateBatchCommand(cliConfig)
+          case ControlObject.SERVER => new CreateServerCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case ControlAction.GET => cliConfig.resource match {
+          case ControlObject.BATCH => new GetBatchCommand(cliConfig)
+          case ControlObject.ENGINE => new GetEngineCommand(cliConfig)
+          case ControlObject.SERVER => new GetServerCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case ControlAction.DELETE => cliConfig.resource match {
+          case ControlObject.BATCH => new DeleteBatchCommand(cliConfig)
+          case ControlObject.ENGINE => new DeleteEngineCommand(cliConfig)
+          case ControlObject.SERVER => new DeleteServerCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case ControlAction.LIST => cliConfig.resource match {
+          case ControlObject.BATCH => new ListBatchCommand(cliConfig)
+          case ControlObject.ENGINE => new ListEngineCommand(cliConfig)
+          case ControlObject.SERVER => new ListServerCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case ControlAction.LOG => cliConfig.resource match {
+          case ControlObject.BATCH => new LogBatchCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case ControlAction.SUBMIT => cliConfig.resource match {
+          case ControlObject.BATCH => new SubmitBatchCommand(cliConfig)
+          case _ => throw new KyuubiException(s"Invalid resource: ${cliConfig.resource}")
+        }
+      case _ => throw new KyuubiException(s"Invalid operation: ${cliConfig.action}")
     }
   }
 
   override def toString: String = {
-    cliArgs.service match {
-      case ServiceControlObject.SERVER =>
+    cliConfig.resource match {
+      case ControlObject.BATCH =>
+        s"""Parsed arguments:
+           |  action                  ${cliConfig.action}
+           |  resource                ${cliConfig.resource}
+           |  batchId                 ${cliConfig.batchOpts.batchId}
+           |  batchType               ${cliConfig.batchOpts.batchType}
+           |  batchUser               ${cliConfig.batchOpts.batchUser}
+           |  batchState              ${cliConfig.batchOpts.batchState}
+           |  createTime              ${cliConfig.batchOpts.createTime}
+           |  endTime                 ${cliConfig.batchOpts.endTime}
+           |  from                    ${cliConfig.batchOpts.from}
+           |  size                    ${cliConfig.batchOpts.size}
+        """.stripMargin
+      case ControlObject.SERVER =>
         s"""Parsed arguments:
-           |  action                  ${cliArgs.action}
-           |  service                 ${cliArgs.service}
-           |  zkQuorum                ${cliArgs.commonOpts.zkQuorum}
-           |  namespace               ${cliArgs.commonOpts.namespace}
-           |  host                    ${cliArgs.commonOpts.host}
-           |  port                    ${cliArgs.commonOpts.port}
-           |  version                 ${cliArgs.commonOpts.version}
-           |  verbose                 ${cliArgs.commonOpts.verbose}
+           |  action                  ${cliConfig.action}
+           |  resource                ${cliConfig.resource}
+           |  zkQuorum                ${cliConfig.commonOpts.zkQuorum}
+           |  namespace               ${cliConfig.commonOpts.namespace}
+           |  host                    ${cliConfig.commonOpts.host}
+           |  port                    ${cliConfig.commonOpts.port}
+           |  version                 ${cliConfig.commonOpts.version}
+           |  verbose                 ${cliConfig.commonOpts.verbose}
         """.stripMargin
-      case ServiceControlObject.ENGINE =>
+      case ControlObject.ENGINE =>
         s"""Parsed arguments:
-           |  action                  ${cliArgs.action}
-           |  service                 ${cliArgs.service}
-           |  zkQuorum                ${cliArgs.commonOpts.zkQuorum}
-           |  namespace               ${cliArgs.commonOpts.namespace}
-           |  user                    ${cliArgs.engineOpts.user}
-           |  host                    ${cliArgs.commonOpts.host}
-           |  port                    ${cliArgs.commonOpts.port}
-           |  version                 ${cliArgs.commonOpts.version}
-           |  verbose                 ${cliArgs.commonOpts.verbose}
+           |  action                  ${cliConfig.action}
+           |  resource                ${cliConfig.resource}
+           |  zkQuorum                ${cliConfig.commonOpts.zkQuorum}
+           |  namespace               ${cliConfig.commonOpts.namespace}
+           |  user                    ${cliConfig.engineOpts.user}
+           |  host                    ${cliConfig.commonOpts.host}
+           |  port                    ${cliConfig.commonOpts.port}
+           |  version                 ${cliConfig.commonOpts.version}
+           |  verbose                 ${cliConfig.commonOpts.verbose}
         """.stripMargin
       case _ => ""
     }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
new file mode 100644
index 000000000..1bde0e6cf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CtlConf.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ctl
+
+import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
+
+object CtlConf {
+
+  private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key)
+
+  val CTL_REST_CLIENT_BASE_URL: OptionalConfigEntry[String] =
+    buildConf("kyuubi.ctl.rest.base.url")
+      .doc("The REST API base URL, " +
+        "which contains the scheme (http:// or https://), host name, port number")
+      .version("1.6.0")
+      .stringConf
+      .createOptional
+
+  val CTL_REST_CLIENT_AUTH_SCHEMA: ConfigEntry[String] =
+    buildConf("kyuubi.ctl.rest.auth.schema")
+      .doc("The authentication schema. Valid values are: basic, spnego.")
+      .version("1.6.0")
+      .stringConf
+      .createWithDefault("basic")
+
+  val CTL_REST_CLIENT_SPNEGO_HOST: OptionalConfigEntry[String] =
+    buildConf("kyuubi.ctl.rest.spnego.host")
+      .doc("When auth schema is spnego, need to config spnego host.")
+      .version("1.6.0")
+      .stringConf
+      .createOptional
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
new file mode 100644
index 000000000..3fc98d22a
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl
+
+import java.util.{Map => JMap}
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.client.KyuubiRestClient
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ctl.CtlConf._
+
+object RestClientFactory {
+
+  private[ctl] def withKyuubiRestClient(
+      cliConfig: CliConfig,
+      map: JMap[String, Object],
+      conf: KyuubiConf)(f: KyuubiRestClient => Unit): Unit = {
+    val kyuubiRestClient: KyuubiRestClient =
+      RestClientFactory.getKyuubiRestClient(cliConfig, map, conf)
+    try {
+      f(kyuubiRestClient)
+    } finally {
+      kyuubiRestClient.close()
+    }
+  }
+
+  private def getKyuubiRestClient(
+      cliConfig: CliConfig,
+      map: JMap[String, Object],
+      conf: KyuubiConf): KyuubiRestClient = {
+    val version = getApiVersion(map)
+    val hostUrl = getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).get, cliConfig, map)
+    val authSchema =
+      getRestConfig("authSchema", conf.get(CTL_REST_CLIENT_AUTH_SCHEMA), cliConfig, map)
+
+    var kyuubiRestClient: KyuubiRestClient = null
+    authSchema.toLowerCase match {
+      case "basic" =>
+        val username = getRestConfig("username", null, cliConfig, map)
+        val password = cliConfig.commonOpts.password
+        kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
+          .apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
+          .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+          .username(username)
+          .password(password)
+          .build()
+      case "spnego" =>
+        val spnegoHost =
+          getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).get, cliConfig, map)
+        kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
+          .apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
+          .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.SPNEGO)
+          .spnegoHost(spnegoHost)
+          .build()
+      case _ => throw new KyuubiException(s"Unsupported auth schema: $authSchema")
+    }
+    kyuubiRestClient
+  }
+
+  private def getApiVersion(map: JMap[String, Object]): String = {
+    var version: String = "V1"
+    if (map != null) {
+      val configuredVersion = map.get("apiVersion").asInstanceOf[String].toUpperCase
+      if (StringUtils.isNotBlank(configuredVersion)) {
+        version = configuredVersion
+      }
+    }
+    version
+  }
+
+  private def getRestConfig(
+      key: String,
+      defaultValue: String,
+      cliConfig: CliConfig,
+      map: JMap[String, Object]): String = {
+    // get value from command line
+    val commonOpts = cliConfig.commonOpts
+    var configValue: String = key match {
+      case "hostUrl" => commonOpts.hostUrl
+      case "authSchema" => commonOpts.authSchema
+      case "username" => commonOpts.username
+      case "spnegoHost" => commonOpts.spnegoHost
+      case _ => null
+    }
+
+    // get value from map
+    if (StringUtils.isBlank(configValue) && map != null) {
+      configValue = map.get(key).asInstanceOf[String]
+    }
+
+    // get value from default
+    if (StringUtils.isBlank(configValue)) {
+      configValue = defaultValue
+    }
+
+    configValue
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
index 0850ff400..f0d0ca15a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/Command.scala
@@ -16,82 +16,43 @@
  */
 package org.apache.kyuubi.ctl.cmd
 
-import java.net.InetAddress
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+import java.util.HashMap
+
+import org.yaml.snakeyaml.Yaml
 
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUBDOMAIN, ENGINE_TYPE}
-import org.apache.kyuubi.ctl.{CliConfig, ServiceControlObject}
+import org.apache.kyuubi.ctl.CliConfig
 import org.apache.kyuubi.ctl.ControlCli.printMessage
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
 
-abstract class Command(var cliArgs: CliConfig) extends Logging {
+abstract class Command(cliConfig: CliConfig) extends Logging {
+
+  protected val DEFAULT_LOG_QUERY_INTERVAL: Int = 1000
 
   val conf = KyuubiConf().loadFileDefaults()
 
-  val verbose = cliArgs.commonOpts.verbose
+  val verbose = cliConfig.commonOpts.verbose
 
-  def preProcess(): Unit = {
-    this.cliArgs = useDefaultPropertyValueIfMissing()
-    validateArguments()
-  }
+  val normalizedCliConfig: CliConfig = useDefaultPropertyValueIfMissing()
 
   /** Ensure that required fields exists. Call this only once all defaults are loaded. */
-  def validateArguments(): Unit
+  def validate(): Unit
 
   def run(): Unit
 
   def fail(msg: String): Unit = throw new KyuubiException(msg)
 
-  protected def validateZkArguments(): Unit = {
-    if (cliArgs.commonOpts.zkQuorum == null) {
-      fail("Zookeeper quorum is not specified and no default value to load")
-    }
-    if (cliArgs.commonOpts.namespace == null) {
-      fail("Zookeeper namespace is not specified and no default value to load")
-    }
-  }
-
-  protected def validateHostAndPort(): Unit = {
-    if (cliArgs.commonOpts.host == null) {
-      fail("Must specify host for service")
-    }
-    if (cliArgs.commonOpts.port == null) {
-      fail("Must specify port for service")
-    }
-
-    try {
-      InetAddress.getByName(cliArgs.commonOpts.host)
-    } catch {
-      case _: Exception =>
-        fail(s"Unknown host: ${cliArgs.commonOpts.host}")
-    }
-
-    try {
-      if (cliArgs.commonOpts.port.toInt <= 0) {
-        fail(s"Specified port should be a positive number")
-      }
-    } catch {
-      case _: NumberFormatException =>
-        fail(s"Specified port is not a valid integer number: ${cliArgs.commonOpts.port}")
-    }
-  }
-
-  protected def validateUser(): Unit = {
-    if (cliArgs.service == ServiceControlObject.ENGINE && cliArgs.engineOpts.user == null) {
-      fail("Must specify user name for engine, please use -u or --user.")
-    }
-  }
-
   protected def mergeArgsIntoKyuubiConf(): Unit = {
-    conf.set(HA_ADDRESSES.key, cliArgs.commonOpts.zkQuorum)
-    conf.set(HA_NAMESPACE.key, cliArgs.commonOpts.namespace)
+    conf.set(HA_ADDRESSES.key, normalizedCliConfig.commonOpts.zkQuorum)
+    conf.set(HA_NAMESPACE.key, normalizedCliConfig.commonOpts.namespace)
   }
 
   private def useDefaultPropertyValueIfMissing(): CliConfig = {
-    var arguments: CliConfig = cliArgs.copy()
-    if (cliArgs.commonOpts.zkQuorum == null) {
+    var arguments: CliConfig = cliConfig.copy()
+    if (cliConfig.commonOpts.zkQuorum == null) {
       conf.getOption(HA_ADDRESSES.key).foreach { v =>
         if (verbose) {
           super.info(s"Zookeeper quorum is not specified, use value from default conf:$v")
@@ -118,41 +79,24 @@ abstract class Command(var cliArgs: CliConfig) extends Logging {
     arguments
   }
 
-  private[ctl] def getZkNamespace(): String = {
-    cliArgs.service match {
-      case ServiceControlObject.SERVER =>
-        DiscoveryPaths.makePath(null, cliArgs.commonOpts.namespace)
-      case ServiceControlObject.ENGINE =>
-        val engineType = Some(cliArgs.engineOpts.engineType)
-          .filter(_ != null).filter(_.nonEmpty)
-          .getOrElse(conf.get(ENGINE_TYPE))
-        val engineSubdomain = Some(cliArgs.engineOpts.engineSubdomain)
-          .filter(_ != null).filter(_.nonEmpty)
-          .getOrElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
-        val engineShareLevel = Some(cliArgs.engineOpts.engineShareLevel)
-          .filter(_ != null).filter(_.nonEmpty)
-          .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
-        // The path of the engine defined in zookeeper comes from
-        // org.apache.kyuubi.engine.EngineRef#engineSpace
-        DiscoveryPaths.makePath(
-          s"${cliArgs.commonOpts.namespace}_${cliArgs.commonOpts.version}_" +
-            s"${engineShareLevel}_${engineType}",
-          cliArgs.engineOpts.user,
-          Array(engineSubdomain))
-    }
-  }
+  private[ctl] def readConfig(): HashMap[String, Object] = {
+    var filename = normalizedCliConfig.createOpts.filename
 
-  private[ctl] def getServiceNodes(
-      discoveryClient: DiscoveryClient,
-      znodeRoot: String,
-      hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
-    val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
-    hostPortOpt match {
-      case Some((host, port)) => serviceNodes.filter { sn =>
-          sn.host == host && sn.port == port
-        }
-      case _ => serviceNodes
+    var map: HashMap[String, Object] = null
+    var br: BufferedReader = null
+    try {
+      val yaml = new Yaml()
+      val input = new FileInputStream(new File(filename))
+      br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
+      map = yaml.load(br).asInstanceOf[HashMap[String, Object]]
+    } catch {
+      case e: Exception => fail(s"Failed to read yaml file[$filename]: $e")
+    } finally {
+      if (br != null) {
+        br.close()
+      }
     }
+    map
   }
 
   override def info(msg: => Any): Unit = printMessage(msg)
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala
deleted file mode 100644
index 6c539dc22..000000000
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/ListCommand.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kyuubi.ctl.cmd
-
-import org.apache.kyuubi.ctl.{CliConfig, Render, ServiceControlObject}
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-
-class ListCommand(cliConfig: CliConfig) extends Command(cliConfig) {
-
-  override def validateArguments(): Unit = {
-    validateZkArguments()
-    cliArgs.service match {
-      case ServiceControlObject.ENGINE => validateUser()
-      case _ =>
-    }
-    mergeArgsIntoKyuubiConf()
-  }
-
-  override def run(): Unit = {
-    list(filterHostPort = false)
-  }
-
-  /**
-   * List Kyuubi server nodes info.
-   */
-  private def list(filterHostPort: Boolean): Unit = {
-    withDiscoveryClient(conf) { discoveryClient =>
-      val znodeRoot = getZkNamespace()
-      val hostPortOpt =
-        if (filterHostPort) {
-          Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
-        } else None
-      val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-
-      val title = "Zookeeper service nodes"
-      info(Render.renderServiceNodesInfo(title, nodes, verbose))
-    }
-  }
-}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
new file mode 100644
index 000000000..2313cce8d
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.create
+
+import java.util.{ArrayList, HashMap}
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
+
+class CreateBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+  def validate(): Unit = {
+    Validator.validateFilename(normalizedCliConfig)
+  }
+
+  def run(): Unit = {
+    val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
+
+    withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+      val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+      val batchRequest = new BatchRequest(
+        map.get("batchType").asInstanceOf[String],
+        request.get("resource").asInstanceOf[String],
+        request.get("className").asInstanceOf[String],
+        request.get("name").asInstanceOf[String],
+        request.get("configs").asInstanceOf[HashMap[String, String]],
+        request.get("args").asInstanceOf[ArrayList[String]])
+
+      val batch: Batch = batchRestApi.createBatch(batchRequest)
+      info(Render.renderBatchInfo(batch))
+    }
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
similarity index 75%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
index 63bf7d092..1dbd1ed46 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/CreateCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateServerCommand.scala
@@ -14,26 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.create
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.kyuubi.ctl.{CliConfig, Render, ServiceControlObject}
+import org.apache.kyuubi.ctl.{CliConfig, ControlObject}
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
 
-class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+class CreateServerCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
-  def validateArguments(): Unit = {
-    if (cliArgs.service != ServiceControlObject.SERVER) {
+  def validate(): Unit = {
+    if (normalizedCliConfig.resource != ControlObject.SERVER) {
       fail("Only support expose Kyuubi server instance to another domain")
     }
-    validateZkArguments()
+
+    Validator.validateZkArguments(normalizedCliConfig)
 
     val defaultNamespace = conf.getOption(HA_NAMESPACE.key)
       .getOrElse(HA_NAMESPACE.defaultValStr)
-    if (defaultNamespace.equals(cliArgs.commonOpts.namespace)) {
+    if (defaultNamespace.equals(normalizedCliConfig.commonOpts.namespace)) {
       fail(
         s"""
            |Only support expose Kyuubi server instance to another domain, a different namespace
@@ -43,21 +46,17 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
   }
 
-  def run(): Unit = {
-    create()
-  }
-
   /**
    * Expose Kyuubi server instance to another domain.
    */
-  private def create(): Unit = {
+  def run(): Unit = {
     val kyuubiConf = conf
 
-    kyuubiConf.setIfMissing(HA_ADDRESSES, cliArgs.commonOpts.zkQuorum)
+    kyuubiConf.setIfMissing(HA_ADDRESSES, normalizedCliConfig.commonOpts.zkQuorum)
     withDiscoveryClient(kyuubiConf) { discoveryClient =>
       val fromNamespace =
         DiscoveryPaths.makePath(null, kyuubiConf.get(HA_NAMESPACE))
-      val toNamespace = getZkNamespace()
+      val toNamespace = CtlUtils.getZkNamespace(kyuubiConf, normalizedCliConfig)
 
       val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
       val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()
@@ -69,7 +68,7 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
               s" from $fromNamespace to $toNamespace")
             val newNodePath = zc.createAndGetServiceNode(
               kyuubiConf,
-              cliArgs.commonOpts.namespace,
+              normalizedCliConfig.commonOpts.namespace,
               sn.instance,
               sn.version,
               true)
@@ -79,10 +78,10 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
           }
         }
 
-        if (kyuubiConf.get(HA_ADDRESSES) == cliArgs.commonOpts.zkQuorum) {
+        if (kyuubiConf.get(HA_ADDRESSES) == normalizedCliConfig.commonOpts.zkQuorum) {
           doCreate(discoveryClient)
         } else {
-          kyuubiConf.set(HA_ADDRESSES, cliArgs.commonOpts.zkQuorum)
+          kyuubiConf.set(HA_ADDRESSES, normalizedCliConfig.commonOpts.zkQuorum)
           withDiscoveryClient(kyuubiConf)(doCreate)
         }
       }
@@ -91,4 +90,5 @@ class CreateCommand(cliConfig: CliConfig) extends Command(cliConfig) {
       info(Render.renderServiceNodesInfo(title, exposedServiceNodes, verbose))
     }
   }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
similarity index 51%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
index 39dd618d3..e871d311d 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala
@@ -14,20 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.util.JsonUtil
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
 
-object Render {
+class DeleteBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+  var result: String = null
+
+  def validate(): Unit = {}
+
+  def run(): Unit = {
+    withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+      val result = batchRestApi.deleteBatch(
+        normalizedCliConfig.batchOpts.batchId,
+        normalizedCliConfig.batchOpts.hs2ProxyUser)
+      info(JsonUtil.toJson(result))
     }
-    Tabulator.format(title, header, rows, verbose)
   }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
similarity index 73%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
index d76f65bc3..e4a9bbbfe 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/DeleteCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteCommand.scala
@@ -14,35 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.delete
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.kyuubi.ctl.{CliConfig, Render}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
 import org.apache.kyuubi.ha.client.ServiceNodeInfo
 
 class DeleteCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
-  def validateArguments(): Unit = {
-    validateZkArguments()
-    validateHostAndPort()
-    validateUser()
+  def validate(): Unit = {
+    Validator.validateZkArguments(normalizedCliConfig)
+    Validator.validateHostAndPort(normalizedCliConfig)
     mergeArgsIntoKyuubiConf()
   }
 
-  override def run(): Unit = {
-    delete()
-  }
-
   /**
    * Delete zookeeper service node with specified host port.
    */
-  private def delete(): Unit = {
+  def run(): Unit = {
     withDiscoveryClient(conf) { discoveryClient =>
-      val znodeRoot = getZkNamespace()
-      val hostPortOpt = Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
-      val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
+      val znodeRoot = CtlUtils.getZkNamespace(conf, normalizedCliConfig)
+      val hostPortOpt =
+        Some((normalizedCliConfig.commonOpts.host, normalizedCliConfig.commonOpts.port.toInt))
+      val nodesToDelete = CtlUtils.getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
 
       val deletedNodes = ListBuffer[ServiceNodeInfo]()
       nodesToDelete.foreach { node =>
@@ -61,4 +59,5 @@ class DeleteCommand(cliConfig: CliConfig) extends Command(cliConfig) {
       info(Render.renderServiceNodesInfo(title, deletedNodes, verbose))
     }
   }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
index 39dd618d3..d0b0067a1 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteEngineCommand.scala
@@ -14,20 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
+class DeleteEngineCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+  override def validate(): Unit = {
+    super.validate()
+
+    // validate user
+    if (normalizedCliConfig.engineOpts.user == null) {
+      fail("Must specify user name for engine, please use -u or --user.")
     }
-    Tabulator.format(title, header, rows, verbose)
   }
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
index 39dd618d3..fea1014a2 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteServerCommand.scala
@@ -14,20 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.delete
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
-
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
-  }
-}
+class DeleteServerCommand(cliConfig: CliConfig) extends DeleteCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala
new file mode 100644
index 000000000..590f06463
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetBatchCommand.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.get
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.Batch
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.Render
+
+class GetBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+  def validate(): Unit = {
+    if (normalizedCliConfig.batchOpts.batchId == null) {
+      fail("Must specify batchId for get batch command.")
+    }
+  }
+
+  def run(): Unit = {
+    withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+      val batch: Batch = batchRestApi.getBatchById(normalizedCliConfig.batchOpts.batchId)
+      info(Render.renderBatchInfo(batch))
+    }
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
similarity index 52%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
index 745495504..49b2ca442 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/GetCommand.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetCommand.scala
@@ -14,34 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl.cmd
+package org.apache.kyuubi.ctl.cmd.get
 
-import org.apache.kyuubi.ctl.{CliConfig, Render}
-import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
 
 class GetCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
-  override def validateArguments(): Unit = {
-    validateZkArguments()
-    validateHostAndPort()
-    validateUser()
+  def validate(): Unit = {
+    Validator.validateZkArguments(normalizedCliConfig)
+    Validator.validateHostAndPort(normalizedCliConfig)
     mergeArgsIntoKyuubiConf()
   }
 
-  override def run(): Unit = list(filterHostPort = true)
-
-  private def list(filterHostPort: Boolean): Unit = {
-    withDiscoveryClient(conf) { discoveryClient =>
-      val znodeRoot = getZkNamespace()
-      val hostPortOpt =
-        if (filterHostPort) {
-          Some((cliArgs.commonOpts.host, cliArgs.commonOpts.port.toInt))
-        } else None
-      val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
-
-      val title = "Zookeeper service nodes"
-      info(Render.renderServiceNodesInfo(title, nodes, verbose))
-    }
+  def run(): Unit = {
+    val nodes = CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = true)
+    val title = "Zookeeper service nodes"
+    info(Render.renderServiceNodesInfo(title, nodes, verbose))
   }
 
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
index 39dd618d3..17557ceb6 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetEngineCommand.scala
@@ -14,20 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.get
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
+class GetEngineCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+  override def validate(): Unit = {
+    super.validate()
+
+    // validate user
+    if (normalizedCliConfig.engineOpts.user == null) {
+      fail("Must specify user name for engine, please use -u or --user.")
     }
-    Tabulator.format(title, header, rows, verbose)
   }
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
index 39dd618d3..fd0f52bd9 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/get/GetServerCommand.scala
@@ -14,20 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.get
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
-
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
-  }
-}
+class GetServerCommand(cliConfig: CliConfig) extends GetCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
new file mode 100644
index 000000000..aa4608dbf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.list
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.GetBatchesResponse
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.Render
+
+class ListBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+  def validate(): Unit = {
+    if (normalizedCliConfig.batchOpts.createTime < 0) {
+      fail(s"Invalid createTime, negative milliseconds are not supported.")
+    }
+    if (normalizedCliConfig.batchOpts.endTime < 0) {
+      fail(s"Invalid endTime, negative milliseconds are not supported.")
+    }
+    if (normalizedCliConfig.batchOpts.endTime != 0
+      && normalizedCliConfig.batchOpts.createTime > normalizedCliConfig.batchOpts.endTime) {
+      fail(s"Invalid createTime/endTime, createTime should be less or equal to endTime.")
+    }
+  }
+
+  def run(): Unit = {
+    withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+      val batchOpts = normalizedCliConfig.batchOpts
+      val batchListInfo: GetBatchesResponse = batchRestApi.listBatches(
+        batchOpts.batchType,
+        batchOpts.batchUser,
+        batchOpts.batchState,
+        batchOpts.createTime,
+        batchOpts.endTime,
+        if (batchOpts.from < 0) 0 else batchOpts.from,
+        batchOpts.size)
+
+      info(Render.renderBatchListInfo(batchListInfo))
+    }
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
similarity index 57%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
index 39dd618d3..ba44f7b51 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListCommand.scala
@@ -14,20 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render, Validator}
 
-object Render {
+class ListCommand(cliConfig: CliConfig) extends Command(cliConfig) {
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
+  def validate(): Unit = {
+    Validator.validateZkArguments(normalizedCliConfig)
+    mergeArgsIntoKyuubiConf()
   }
+
+  def run(): Unit = {
+    val nodes = CtlUtils.listZkServerNodes(conf, normalizedCliConfig, filterHostPort = false)
+
+    val title = "Zookeeper service nodes"
+    info(Render.renderServiceNodesInfo(title, nodes, verbose))
+  }
+
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
similarity index 62%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
index 39dd618d3..1cad9ac23 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListEngineCommand.scala
@@ -14,20 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
+class ListEngineCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+  override def validate(): Unit = {
+    super.validate()
+
+    // validate user
+    if (normalizedCliConfig.engineOpts.user == null) {
+      fail("Must specify user name for engine, please use -u or --user.")
     }
-    Tabulator.format(title, header, rows, verbose)
   }
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
similarity index 61%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
index 39dd618d3..cea56e70c 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListServerCommand.scala
@@ -14,20 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
+package org.apache.kyuubi.ctl.cmd.list
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ctl.CliConfig
 
-object Render {
-
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
-  }
-}
+class ListServerCommand(cliConfig: CliConfig) extends ListCommand(cliConfig) {}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
new file mode 100644
index 000000000..40ad5ec8c
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.log
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+
+class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+  def validate(): Unit = {
+    if (normalizedCliConfig.batchOpts.batchId == null) {
+      fail("Must specify batchId for log batch command.")
+    }
+  }
+
+  def run(): Unit = {
+    withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+      val batchId = normalizedCliConfig.batchOpts.batchId
+      var from = normalizedCliConfig.batchOpts.from
+      val size = normalizedCliConfig.batchOpts.size
+      var log: OperationLog = batchRestApi.getBatchLocalLog(
+        batchId,
+        from,
+        size)
+      log.getLogRowSet.asScala.foreach(x => info(x))
+
+      var done = false
+      var batch: Batch = null
+      from = if (from < 0) log.getLogRowSet.size else from + log.getLogRowSet.size
+      if (normalizedCliConfig.logOpts.forward) {
+        while (!done) {
+          log = batchRestApi.getBatchLocalLog(
+            batchId,
+            from,
+            size)
+          from += log.getLogRowSet.size
+          log.getLogRowSet.asScala.foreach(x => info(x))
+
+          Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+
+          batch = batchRestApi.getBatchById(batchId)
+          if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
+            && batch.getState() != "RUNNING") {
+            done = true
+          }
+        }
+      }
+    }
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
new file mode 100644
index 000000000..956aaf5d1
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.cmd.submit
+
+import java.util.{ArrayList, HashMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.BatchRestApi
+import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest, OperationLog}
+import org.apache.kyuubi.ctl.CliConfig
+import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
+import org.apache.kyuubi.ctl.cmd.Command
+import org.apache.kyuubi.ctl.util.{CtlUtils, Validator}
+
+class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
+
+  def validate(): Unit = {
+    Validator.validateFilename(normalizedCliConfig)
+  }
+
+  def run(): Unit = {
+    val map = CtlUtils.loadYamlAsMap(normalizedCliConfig)
+
+    withKyuubiRestClient(normalizedCliConfig, map, conf) { kyuubiRestClient =>
+      val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
+
+      val request = map.get("request").asInstanceOf[HashMap[String, Object]]
+      val batchRequest = new BatchRequest(
+        map.get("batchType").asInstanceOf[String],
+        request.get("resource").asInstanceOf[String],
+        request.get("className").asInstanceOf[String],
+        request.get("name").asInstanceOf[String],
+        request.get("configs").asInstanceOf[HashMap[String, String]],
+        request.get("args").asInstanceOf[ArrayList[String]])
+
+      var batch: Batch = batchRestApi.createBatch(batchRequest)
+      val batchId = batch.getId
+      var log: OperationLog = null
+      var done = false
+      var from = 0
+      val size = normalizedCliConfig.batchOpts.size
+      while (!done) {
+        log = batchRestApi.getBatchLocalLog(
+          batchId,
+          from,
+          size)
+        from += log.getLogRowSet.size
+        log.getLogRowSet.asScala.foreach(x => info(x))
+
+        Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
+
+        batch = batchRestApi.getBatchById(batchId)
+        if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
+          && batch.getState() != "RUNNING") {
+          done = true
+        }
+      }
+    }
+  }
+
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
similarity index 55%
copy from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
copy to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
index 39dd618d3..7d2a29b1e 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CommandLineUtils.scala
@@ -14,20 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+package org.apache.kyuubi.ctl.util
 
-object Render {
+import java.io.PrintStream
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
-  }
+/**
+ * Contains basic command line parsing functionality and methods to parse some common Kyuubi Ctl
+ * options.
+ */
+private[kyuubi] trait CommandLineUtils extends CommandLineLoggingUtils {
+
+  def main(args: Array[String]): Unit
+}
+
+private[kyuubi] trait CommandLineLoggingUtils {
+  // Exposed for testing
+  private[kyuubi] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
+
+  private[kyuubi] var printStream: PrintStream = System.err
+
+  // scalastyle:off println
+  private[kyuubi] def printMessage(msg: Any): Unit = printStream.println(msg)
+  // scalastyle:on println
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
new file mode 100644
index 000000000..c957111d0
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/CtlUtils.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+import java.util.{Map => JMap}
+
+import org.yaml.snakeyaml.Yaml
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUBDOMAIN, ENGINE_TYPE}
+import org.apache.kyuubi.ctl.{CliConfig, ControlObject}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths, ServiceNodeInfo}
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
+
+object CtlUtils {
+
+  private[ctl] def getZkNamespace(conf: KyuubiConf, cliConfig: CliConfig): String = {
+    cliConfig.resource match {
+      case ControlObject.SERVER =>
+        DiscoveryPaths.makePath(null, cliConfig.commonOpts.namespace)
+      case ControlObject.ENGINE =>
+        val engineType = Some(cliConfig.engineOpts.engineType)
+          .filter(_ != null).filter(_.nonEmpty)
+          .getOrElse(conf.get(ENGINE_TYPE))
+        val engineSubdomain = Some(cliConfig.engineOpts.engineSubdomain)
+          .filter(_ != null).filter(_.nonEmpty)
+          .getOrElse(conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
+        val engineShareLevel = Some(cliConfig.engineOpts.engineShareLevel)
+          .filter(_ != null).filter(_.nonEmpty)
+          .getOrElse(conf.get(ENGINE_SHARE_LEVEL))
+        // The path of the engine defined in zookeeper comes from
+        // org.apache.kyuubi.engine.EngineRef#engineSpace
+        DiscoveryPaths.makePath(
+          s"${cliConfig.commonOpts.namespace}_" +
+            s"${cliConfig.commonOpts.version}_" +
+            s"${engineShareLevel}_${engineType}",
+          cliConfig.engineOpts.user,
+          Array(engineSubdomain))
+    }
+  }
+
+  private[ctl] def getServiceNodes(
+      discoveryClient: DiscoveryClient,
+      znodeRoot: String,
+      hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
+    val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
+    hostPortOpt match {
+      case Some((host, port)) => serviceNodes.filter { sn =>
+          sn.host == host && sn.port == port
+        }
+      case _ => serviceNodes
+    }
+  }
+
+  /**
+   * List Kyuubi server nodes info.
+   */
+  private[ctl] def listZkServerNodes(
+      conf: KyuubiConf,
+      cliConfig: CliConfig,
+      filterHostPort: Boolean): Seq[ServiceNodeInfo] = {
+    var nodes = Seq.empty[ServiceNodeInfo]
+    withDiscoveryClient(conf) { discoveryClient =>
+      val znodeRoot = getZkNamespace(conf, cliConfig)
+      val hostPortOpt =
+        if (filterHostPort) {
+          Some((cliConfig.commonOpts.host, cliConfig.commonOpts.port.toInt))
+        } else None
+      nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
+    }
+    nodes
+  }
+
+  private[ctl] def loadYamlAsMap(cliConfig: CliConfig): JMap[String, Object] = {
+    val filename = cliConfig.createOpts.filename
+
+    var map: JMap[String, Object] = null
+    var br: BufferedReader = null
+    try {
+      val yaml = new Yaml()
+      val input = new FileInputStream(new File(filename))
+      br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
+      map = yaml.load(br).asInstanceOf[JMap[String, Object]]
+    } catch {
+      case e: Exception => throw new KyuubiException(s"Failed to read yaml file[$filename]: $e")
+    } finally {
+      if (br != null) {
+        br.close()
+      }
+    }
+    map
+  }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
similarity index 54%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
index 39dd618d3..67ac76333 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/Render.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/DateTimeUtils.scala
@@ -14,20 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.ctl
 
-import org.apache.kyuubi.ha.client.ServiceNodeInfo
+package org.apache.kyuubi.ctl.util
 
-object Render {
+import java.time.{Instant, LocalDateTime, ZoneId}
+import java.time.format.DateTimeFormatter
 
-  private[ctl] def renderServiceNodesInfo(
-      title: String,
-      serviceNodeInfo: Seq[ServiceNodeInfo],
-      verbose: Boolean): String = {
-    val header = Seq("Namespace", "Host", "Port", "Version")
-    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
-      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
-    }
-    Tabulator.format(title, header, rows, verbose)
+private[ctl] object DateTimeUtils {
+
+  def dateStringToMillis(date: String, format: String): Long = {
+    if (date == null) return 0
+    val localDateTime = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format))
+    localDateTime.atZone(ZoneId.systemDefault).toInstant.toEpochMilli
+  }
+
+  def millisToDateString(millis: Long, format: String): String = {
+    val formatter = DateTimeFormatter.ofPattern(format)
+    val date: LocalDateTime =
+      Instant.ofEpochMilli(millis).atZone(ZoneId.systemDefault()).toLocalDateTime()
+    formatter.format(date)
   }
 }
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala
new file mode 100644
index 000000000..210047fcf
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Render.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.client.api.v1.dto.{Batch, GetBatchesResponse}
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+
+private[ctl] object Render {
+
+  def renderServiceNodesInfo(
+      title: String,
+      serviceNodeInfo: Seq[ServiceNodeInfo],
+      verbose: Boolean): String = {
+    val header = Seq("Namespace", "Host", "Port", "Version")
+    val rows = serviceNodeInfo.sortBy(_.nodeName).map { sn =>
+      Seq(sn.namespace, sn.host, sn.port.toString, sn.version.getOrElse(""))
+    }
+    Tabulator.format(title, header, rows, verbose)
+  }
+
+  def renderBatchListInfo(batchListInfo: GetBatchesResponse): String = {
+    val title = s"Total number of batches: ${batchListInfo.getTotal}"
+    val header =
+      Seq("Id", "Name", "User", "Type", "Instance", "State", "App Info", "Create Time", "End Time")
+    val rows = batchListInfo.getBatches.asScala.sortBy(_.getCreateTime).map { batch =>
+      Seq(
+        batch.getId,
+        batch.getName,
+        batch.getUser,
+        batch.getBatchType,
+        batch.getKyuubiInstance,
+        batch.getState,
+        batch.getBatchInfo.toString,
+        millisToDateString(batch.getCreateTime, "yyyy-MM-dd HH:mm:ss"),
+        millisToDateString(batch.getEndTime, "yyyy-MM-dd HH:mm:ss"))
+    }
+    Tabulator.format(title, header, rows, true)
+  }
+
+  def renderBatchInfo(batch: Batch): String = {
+    s"""Batch Info:
+       |  Batch Id: ${batch.getId}
+       |  Type: ${batch.getBatchType}
+       |  Name: ${batch.getName}
+       |  User: ${batch.getUser}
+       |  State: ${batch.getState}
+       |  Kyuubi Instance: ${batch.getKyuubiInstance}
+       |  Create Time: ${millisToDateString(batch.getCreateTime, "yyyy-MM-dd HH:mm:ss")}
+       |  End Time: ${millisToDateString(batch.getEndTime, "yyyy-MM-dd HH:mm:ss")}
+       |  App Info: ${batch.getBatchInfo.toString}
+        """.stripMargin
+  }
+}
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
similarity index 83%
rename from kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala
rename to kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
index 98714f731..eaf68d3e5 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/CommandLineUtils.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Tabulator.scala
@@ -15,32 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ctl
-
-import java.io.PrintStream
+package org.apache.kyuubi.ctl.util
 
 import org.apache.commons.lang3.StringUtils
 
-/**
- * Contains basic command line parsing functionality and methods to parse some common Kyuubi Ctl
- * options.
- */
-private[kyuubi] trait CommandLineUtils extends CommandLineLoggingUtils {
-
-  def main(args: Array[String]): Unit
-}
-
-private[kyuubi] trait CommandLineLoggingUtils {
-  // Exposed for testing
-  private[kyuubi] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
-
-  private[kyuubi] var printStream: PrintStream = System.err
-
-  // scalastyle:off println
-  private[kyuubi] def printMessage(msg: Any): Unit = printStream.println(msg)
-  // scalastyle:on println
-}
-
 /** Refer the showString method of org.apache.spark.sql.Dataset */
 private[kyuubi] object Tabulator {
 
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
new file mode 100644
index 000000000..67fe497e1
--- /dev/null
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/util/Validator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl.util
+
+import java.net.InetAddress
+import java.nio.file.{Files, Paths}
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.ctl.CliConfig
+
+private[ctl] object Validator {
+
+  def validateZkArguments(cliConfig: CliConfig): Unit = {
+    if (cliConfig.commonOpts.zkQuorum == null) {
+      fail("Zookeeper quorum is not specified and no default value to load")
+    }
+    if (cliConfig.commonOpts.namespace == null) {
+      fail("Zookeeper namespace is not specified and no default value to load")
+    }
+  }
+
+  def validateHostAndPort(cliConfig: CliConfig): Unit = {
+    if (cliConfig.commonOpts.host == null) {
+      fail("Must specify host for service")
+    }
+    if (cliConfig.commonOpts.port == null) {
+      fail("Must specify port for service")
+    }
+
+    try {
+      InetAddress.getByName(cliConfig.commonOpts.host)
+    } catch {
+      case _: Exception =>
+        fail(s"Unknown host: ${cliConfig.commonOpts.host}")
+    }
+
+    try {
+      if (cliConfig.commonOpts.port.toInt <= 0) {
+        fail(s"Specified port should be a positive number")
+      }
+    } catch {
+      case _: NumberFormatException =>
+        fail(s"Specified port is not a valid integer number: " +
+          s"${cliConfig.commonOpts.port}")
+    }
+  }
+
+  def validateFilename(cliConfig: CliConfig): Unit = {
+    var filename = cliConfig.createOpts.filename
+    if (StringUtils.isBlank(filename)) {
+      fail(s"Config file is not specified.")
+    }
+
+    val currentPath = Paths.get("").toAbsolutePath
+    if (!filename.startsWith("/")) {
+      filename = currentPath + "/" + filename
+    }
+    if (!Files.exists(Paths.get(filename))) {
+      fail(s"Config file does not exist: ${filename}.")
+    }
+  }
+
+  private def fail(msg: String): Unit = throw new KyuubiException(msg)
+}
diff --git a/kyuubi-ctl/src/test/resources/cli/batch.yaml b/kyuubi-ctl/src/test/resources/cli/batch.yaml
new file mode 100644
index 000000000..ee3d6bdd0
--- /dev/null
+++ b/kyuubi-ctl/src/test/resources/cli/batch.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v1
+user: test_user
+request:
+  name: test_batch
+  resource: /MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar
+  className: org.apache.kyuubi.engine.spark.SparkSQLEngine
+  args:
+    - x1
+    - x2
+  configs:
+    hive.server2.proxy.user: b_user
+    wait.completion: true
+    k1: v1
+options:
+  verbose: true
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
new file mode 100644
index 000000000..d0d41a0a4
--- /dev/null
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/BatchCliArgumentsSuite.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.ctl
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ctl.util.DateTimeUtils._
+
+class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
+
+  test("create/submit batch") {
+    Seq("create", "submit").foreach { op =>
+      val args = Seq(
+        op,
+        "batch",
+        "-f",
+        "src/test/resources/cli/batch.yaml")
+      val opArgs = new ControlCliArguments(args)
+      assert(opArgs.cliConfig.createOpts.filename == "src/test/resources/cli/batch.yaml")
+    }
+  }
+
+  test("create/submit batch and overwrite rest config") {
+    Seq("create", "submit").foreach { op =>
+      val args = Array(
+        op,
+        "batch",
+        "-f",
+        "src/test/resources/cli/batch.yaml",
+        "--hostUrl",
+        "https://localhost:8440",
+        "--username",
+        "test_user_1",
+        "--authSchema",
+        "spnego")
+      val opArgs = new ControlCliArguments(args)
+      assert(opArgs.cliConfig.commonOpts.hostUrl == "https://localhost:8440")
+      assert(opArgs.cliConfig.commonOpts.authSchema == "spnego")
+      assert(opArgs.cliConfig.commonOpts.username == "test_user_1")
+    }
+  }
+
+  test("create/submit batch without filename specified") {
+    Seq("create", "submit").foreach { op =>
+      val args = Array(
+        op,
+        "batch")
+      testPrematureExitForControlCliArgs(args, "Config file is not specified.")
+    }
+  }
+
+  test("create/submit batch with non-existed file") {
+    Seq("create", "submit").foreach { op =>
+      val args = Array(
+        op,
+        "batch",
+        "-f",
+        "fake.yaml")
+      testPrematureExitForControlCliArgs(args, "Config file does not exist")
+    }
+  }
+
+  test("get batch without batch id specified") {
+    val args = Array(
+      "get",
+      "batch")
+    testPrematureExitForControlCliArgs(args, "Must specify batchId for get batch command")
+  }
+
+  test("get batch") {
+    val args = Array(
+      "get",
+      "batch",
+      "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+  }
+
+  test("test list batch option") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--batchUser",
+      "tom",
+      "--batchState",
+      "RUNNING",
+      "--createTime",
+      "20220607000000",
+      "--from",
+      "2",
+      "--size",
+      "5")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.batchType == "spark")
+    assert(opArgs.cliConfig.batchOpts.batchUser == "tom")
+    assert(opArgs.cliConfig.batchOpts.batchState == "RUNNING")
+    assert(opArgs.cliConfig.batchOpts.createTime ==
+      dateStringToMillis("20220607000000", "yyyyMMddHHmmss"))
+    assert(opArgs.cliConfig.batchOpts.endTime == 0)
+    assert(opArgs.cliConfig.batchOpts.from == 2)
+    assert(opArgs.cliConfig.batchOpts.size == 5)
+  }
+
+  test("test list batch default option") {
+    val args = Array(
+      "list",
+      "batch")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.batchType == null)
+    assert(opArgs.cliConfig.batchOpts.from == -1)
+    assert(opArgs.cliConfig.batchOpts.size == 100)
+  }
+
+  test("test bad list batch option - size") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "-4")
+    testPrematureExitForControlCliArgs(args, "Option --size must be >=0")
+  }
+
+  test("test bad list batch option - create date format") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "4",
+      "--createTime",
+      "20220101")
+    testPrematureExitForControlCliArgs(
+      args,
+      "Option --createTime must be in yyyyMMddHHmmss format.")
+  }
+
+  test("test bad list batch option - end date format") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "4",
+      "--endTime",
+      "20220101")
+    testPrematureExitForControlCliArgs(args, "Option --endTime must be in yyyyMMddHHmmss format.")
+  }
+
+  test("test bad list batch option - negative create date") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "4",
+      "--createTime",
+      "19690101000000")
+    testPrematureExitForControlCliArgs(
+      args,
+      "Invalid createTime, negative milliseconds are not supported.")
+  }
+
+  test("test bad list batch option - negative end date") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "4",
+      "--endTime",
+      "19690101000000")
+    testPrematureExitForControlCliArgs(
+      args,
+      "Invalid endTime, negative milliseconds are not supported.")
+  }
+
+  test("test bad list batch option - createTime > endTime") {
+    val args = Array(
+      "list",
+      "batch",
+      "--batchType",
+      "spark",
+      "--size",
+      "4",
+      "--createTime",
+      "20220602000000",
+      "--endTime",
+      "20220601000000")
+    testPrematureExitForControlCliArgs(
+      args,
+      "Invalid createTime/endTime, " +
+        "createTime should be less or equal to endTime.")
+  }
+
+  test("test log batch") {
+    val args = Array(
+      "log",
+      "batch",
+      "f7fd702c-e54e-11ec-8fea-0242ac120002",
+      "--from",
+      "2",
+      "--size",
+      "5")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    assert(opArgs.cliConfig.batchOpts.from == 2)
+    assert(opArgs.cliConfig.batchOpts.size == 5)
+  }
+
+  test("test log batch without batchId") {
+    val args = Array(
+      "log",
+      "batch",
+      "--from",
+      "2",
+      "--size",
+      "5")
+    testPrematureExitForControlCliArgs(args, "Must specify batchId for log batch command")
+  }
+
+  test("test log batch default option") {
+    val args = Array(
+      "log",
+      "batch",
+      "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    val opArgs = new ControlCliArguments(args)
+    assert(opArgs.cliConfig.batchOpts.batchId == "f7fd702c-e54e-11ec-8fea-0242ac120002")
+    assert(opArgs.cliConfig.batchOpts.from == -1)
+    assert(opArgs.cliConfig.batchOpts.size == 100)
+  }
+
+}
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
index 9f424a742..392f0e3c1 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
@@ -20,33 +20,13 @@ package org.apache.kyuubi.ctl
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
 
-class ControlCliArgumentsSuite extends KyuubiFunSuite {
+class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
   val zkQuorum = "localhost:2181"
   val namespace = "kyuubi"
   val user = "kyuubi"
   val host = "localhost"
   val port = "10000"
 
-  /** Check whether the script exits and the given search string is printed. */
-  private def testPrematureExit(args: Array[String], searchString: String): Unit = {
-    val logAppender = new LogAppender("test premature exit")
-    withLogAppender(logAppender) {
-      val thread = new Thread {
-        override def run(): Unit =
-          try {
-            new ControlCliArguments(args)
-          } catch {
-            case e: Exception =>
-              error(e)
-          }
-      }
-      thread.start()
-      thread.join()
-      assert(logAppender.loggingEvents.exists(
-        _.getMessage.getFormattedMessage.contains(searchString)))
-    }
-  }
-
   /** Check whether the script exits and the given search string is printed. */
   private def testHelpExit(args: Array[String], searchString: String): Unit = {
     val logAppender = new LogAppender("test premature exit")
@@ -91,14 +71,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
           "--version",
           KYUUBI_VERSION)
         val opArgs = new ControlCliArguments(args)
-        assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
-        assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
-        assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
-        assert(opArgs.cliArgs.commonOpts.namespace == namespace)
-        assert(opArgs.cliArgs.engineOpts.user == user)
-        assert(opArgs.cliArgs.commonOpts.host == host)
-        assert(opArgs.cliArgs.commonOpts.port == port)
-        assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+        assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+        assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+        assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+        assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+        assert(opArgs.cliConfig.engineOpts.user == user)
+        assert(opArgs.cliConfig.commonOpts.host == host)
+        assert(opArgs.cliConfig.commonOpts.port == port)
+        assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
       }
     }
 
@@ -120,31 +100,37 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         "--version",
         KYUUBI_VERSION)
       val opArgs = new ControlCliArguments(args)
-      assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
-      assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
-      assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
-      assert(opArgs.cliArgs.commonOpts.namespace == newNamespace)
-      assert(opArgs.cliArgs.commonOpts.host == host)
-      assert(opArgs.cliArgs.commonOpts.port == port)
-      assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+      assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+      assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+      assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+      assert(opArgs.cliConfig.commonOpts.namespace == newNamespace)
+      assert(opArgs.cliConfig.commonOpts.host == host)
+      assert(opArgs.cliConfig.commonOpts.port == port)
+      assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
     }
   }
 
   test("prints usage on empty input") {
-    testPrematureExit(Array.empty[String], "Must specify action command: [create|get|delete|list].")
-    testPrematureExit(Array("--verbose"), "Must specify action command: [create|get|delete|list].")
+    testPrematureExitForControlCliArgs(
+      Array.empty[String],
+      "Must specify action command: [create|get|delete|list|log|submit].")
+    testPrematureExitForControlCliArgs(
+      Array("--verbose"),
+      "Must specify action command: [create|get|delete|list|log|submit].")
   }
 
   test("prints error with unrecognized options") {
-    testPrematureExit(Array("create", "--unknown"), "Unknown option --unknown")
-    testPrematureExit(Array("--unknown"), "Unknown option --unknown")
+    testPrematureExitForControlCliArgs(Array("create", "--unknown"), "Unknown option --unknown")
+    testPrematureExitForControlCliArgs(Array("--unknown"), "Unknown option --unknown")
   }
 
   test("test invalid arguments") {
     // for server, user option is not support
-    testPrematureExit(Array("create", "--user"), "Unknown option --user")
+    testPrematureExitForControlCliArgs(Array("create", "--user"), "Unknown option --user")
     // for engine, user option need a value
-    testPrematureExit(Array("get", "engine", "--user"), "Missing value after --user")
+    testPrematureExitForControlCliArgs(
+      Array("get", "engine", "--user"),
+      "Missing value after --user")
   }
 
   test("test extra unused arguments") {
@@ -152,13 +138,13 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       "list",
       "extraArg1",
       "extraArg2")
-    testPrematureExit(args, "Unknown argument 'extraArg1'")
+    testPrematureExitForControlCliArgs(args, "Unknown argument 'extraArg1'")
   }
 
   test("test list action arguments") {
     val args = Array(
       "list")
-    testPrematureExit(args, "Zookeeper quorum is not specified")
+    testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
 
     val args2 = Array(
       "list",
@@ -167,14 +153,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       "--namespace",
       namespace)
     val opArgs = new ControlCliArguments(args2)
-    assert(opArgs.cliArgs.action == ServiceControlAction.LIST)
+    assert(opArgs.cliConfig.action == ControlAction.LIST)
   }
 
   test("test get/delete action arguments") {
     Seq("get", "delete").foreach { op =>
       val args = Array(
         op)
-      testPrematureExit(args, "Zookeeper quorum is not specified")
+      testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
 
       val args2 = Array(
         op,
@@ -182,7 +168,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         zkQuorum,
         "--namespace",
         namespace)
-      testPrematureExit(args2, "Must specify host")
+      testPrematureExitForControlCliArgs(args2, "Must specify host")
 
       val args3 = Array(
         op,
@@ -192,7 +178,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         namespace,
         "--host",
         host)
-      testPrematureExit(args3, "Must specify port")
+      testPrematureExitForControlCliArgs(args3, "Must specify port")
 
       val args4 = Array(
         op,
@@ -205,7 +191,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         host,
         "--port",
         port)
-      testPrematureExit(args4, "Must specify user name for engine")
+      testPrematureExitForControlCliArgs(args4, "Must specify user name for engine")
 
       val args5 = Array(
         op,
@@ -219,7 +205,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         "--port",
         port)
       val opArgs6 = new ControlCliArguments(args5)
-      assert(opArgs6.cliArgs.action.toString.equalsIgnoreCase(op))
+      assert(opArgs6.cliConfig.action.toString.equalsIgnoreCase(op))
     }
   }
 
@@ -235,7 +221,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       "unknown-host",
       "--port",
       port)
-    testPrematureExit(args, "Unknown host")
+    testPrematureExitForControlCliArgs(args, "Unknown host")
   }
 
   test("test with invalid port specification") {
@@ -250,7 +236,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       host,
       "--port",
       "invalid-format")
-    testPrematureExit(args, "Specified port is not a valid integer number")
+    testPrematureExitForControlCliArgs(args, "Specified port is not a valid integer number")
 
     val args2 = Array(
       "get",
@@ -263,7 +249,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       host,
       "--port",
       "0")
-    testPrematureExit(args2, "Specified port should be a positive number")
+    testPrematureExitForControlCliArgs(args2, "Specified port should be a positive number")
   }
 
   test("test create action arguments") {
@@ -272,7 +258,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       val op = "create"
       val args = Array(
         op)
-      testPrematureExit(args, "Zookeeper quorum is not specified")
+      testPrematureExitForControlCliArgs(args, "Zookeeper quorum is not specified")
 
       val args2 = Array(
         op,
@@ -282,7 +268,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         "--namespace",
         newNamespace)
       val opArgs2 = new ControlCliArguments(args2)
-      assert(opArgs2.cliArgs.action.toString.equalsIgnoreCase(op))
+      assert(opArgs2.cliConfig.action.toString.equalsIgnoreCase(op))
 
       val args4 = Array(
         op,
@@ -292,7 +278,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
         "--namespace",
         newNamespace)
       // engine is not support, expect scopt print Unknown argument.
-      testPrematureExit(args4, "Unknown argument 'engine'")
+      testPrematureExitForControlCliArgs(args4, "Unknown argument 'engine'")
     }
   }
 
@@ -302,8 +288,8 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       "--zk-quorum",
       zkQuorum)
     val opArgs = new ControlCliArguments(args)
-    assert(opArgs.cliArgs.commonOpts.namespace == namespace)
-    assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+    assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+    assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
   }
 
   test("test use short options") {
@@ -325,14 +311,14 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
           "-v",
           KYUUBI_VERSION)
         val opArgs = new ControlCliArguments(args)
-        assert(opArgs.cliArgs.action.toString.equalsIgnoreCase(op))
-        assert(opArgs.cliArgs.service.toString.equalsIgnoreCase(service))
-        assert(opArgs.cliArgs.commonOpts.zkQuorum == zkQuorum)
-        assert(opArgs.cliArgs.commonOpts.namespace == namespace)
-        assert(opArgs.cliArgs.engineOpts.user == user)
-        assert(opArgs.cliArgs.commonOpts.host == host)
-        assert(opArgs.cliArgs.commonOpts.port == port)
-        assert(opArgs.cliArgs.commonOpts.version == KYUUBI_VERSION)
+        assert(opArgs.cliConfig.action.toString.equalsIgnoreCase(op))
+        assert(opArgs.cliConfig.resource.toString.equalsIgnoreCase(service))
+        assert(opArgs.cliConfig.commonOpts.zkQuorum == zkQuorum)
+        assert(opArgs.cliConfig.commonOpts.namespace == namespace)
+        assert(opArgs.cliConfig.engineOpts.user == user)
+        assert(opArgs.cliConfig.commonOpts.host == host)
+        assert(opArgs.cliConfig.commonOpts.port == port)
+        assert(opArgs.cliConfig.commonOpts.version == KYUUBI_VERSION)
       }
     }
 
@@ -343,7 +329,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       zkQuorum,
       "-b")
     val opArgs3 = new ControlCliArguments(args2)
-    assert(opArgs3.cliArgs.commonOpts.verbose)
+    assert(opArgs3.cliConfig.commonOpts.verbose)
   }
 
   test("test --help") {
@@ -353,7 +339,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
       " change it if the active service is running in another."
     val helpString =
       s"""kyuubi $KYUUBI_VERSION
-         |Usage: kyuubi-ctl [create|get|delete|list] [options]
+         |Usage: kyuubi-ctl [create|get|delete|list|log|submit] [options] <args>...
          |
          |  -zk, --zk-quorum <value>
          |                           $zkHelpString
@@ -362,14 +348,25 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
          |  -p, --port <value>       Listening port of a service.
          |  -v, --version <value>    $versionHelpString
          |  -b, --verbose            Print additional debug output.
+         |  --hostUrl <value>        Host url for rest api.
+         |  --authSchema <value>     Auth schema for rest api, valid values are basic, spnego.
+         |  --username <value>       Username for basic authentication.
+         |  --password <value>       Password for basic authentication.
+         |  --spnegoHost <value>     Spnego host for spnego authentication.
          |
-         |Command: create [server]
-         |
+         |Command: create [batch|server] [options]
+         |${"\t"}Create a resource.
+         |  -f, --filename <value>   Filename to use to create the resource
+         |Command: create batch
+         |${"\t"}Open batch session.
          |Command: create server
          |${"\t"}Expose Kyuubi server instance to another domain.
          |
-         |Command: get [server|engine] [options]
-         |${"\t"}Get the service/engine node info, host and port needed.
+         |Command: get [batch|server|engine] [options] [<batchId>]
+         |${"\t"}Display information about the specified resources.
+         |Command: get batch
+         |${"\t"}Get batch by id.
+         |  <batchId>                Batch id.
          |Command: get server
          |${"\t"}Get Kyuubi server info of domain
          |Command: get engine
@@ -382,8 +379,12 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
          |  -esl, --engine-share-level <value>
          |                           The engine share level this engine belong to.
          |
-         |Command: delete [server|engine] [options]
-         |${"\t"}Delete the specified service/engine node, host and port needed.
+         |Command: delete [batch|server|engine] [options] [<batchId>]
+         |${"\t"}Delete resources.
+         |Command: delete batch
+         |${"\t"}Close batch session.
+         |  <batchId>                Batch id.
+         |  --hs2ProxyUser <value>   The value of hive.server2.proxy.user config.
          |Command: delete server
          |${"\t"}Delete the specified service node for a domain
          |Command: delete engine
@@ -396,8 +397,17 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
          |  -esl, --engine-share-level <value>
          |                           The engine share level this engine belong to.
          |
-         |Command: list [server|engine] [options]
-         |${"\t"}List all the service/engine nodes for a particular domain.
+         |Command: list [batch|server|engine] [options]
+         |${"\t"}List information about resources.
+         |Command: list batch
+         |${"\t"}List batch session info.
+         |  --batchType <value>      Batch type.
+         |  --batchUser <value>      Batch user.
+         |  --batchState <value>     Batch state.
+         |  --createTime <value>     Batch create time, should be in yyyyMMddHHmmss format.
+         |  --endTime <value>        Batch end time, should be in yyyyMMddHHmmss format.
+         |  --from <value>           Specify which record to start from retrieving info.
+         |  --size <value>           The max number of records returned in the query.
          |Command: list server
          |${"\t"}List all the service nodes for a particular domain
          |Command: list engine
@@ -410,6 +420,21 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite {
          |  -esl, --engine-share-level <value>
          |                           The engine share level this engine belong to.
          |
+         |Command: log [batch] [options] [<batchId>]
+         |${"\t"}Print the logs for specified resource.
+         |  --forward                If forward is specified, the ctl will block forever.
+         |Command: log batch
+         |${"\t"}Get batch session local log.
+         |  <batchId>                Batch id.
+         |  --from <value>           Specify which record to start from retrieving info.
+         |  --size <value>           The max number of records returned in the query.
+         |
+         |Command: submit [batch] [options]
+         |${"\t"}Combination of create, get and log commands.
+         |  -f, --filename <value>   Filename to use to create the resource
+         |Command: submit batch
+         |${"\t"}open batch session and wait for completion.
+         |
          |  -h, --help               Show help message and exit.""".stripMargin
 
     testHelpExit(Array("--help"), helpString)
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
index f1a9984b9..cdd87b147 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliSuite.scala
@@ -17,73 +17,15 @@
 
 package org.apache.kyuubi.ctl
 
-import java.io.{OutputStream, PrintStream}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ctl.util.{CtlUtils, Render}
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE}
 import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
-trait TestPrematureExit {
-  suite: KyuubiFunSuite =>
-
-  private val noOpOutputStream = new OutputStream {
-    def write(b: Int) = {}
-  }
-
-  /** Simple PrintStream that reads data into a buffer */
-  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
-    var lineBuffer = ArrayBuffer[String]()
-    // scalastyle:off println
-    override def println(line: Any): Unit = {
-      lineBuffer += line.toString
-    }
-    // scalastyle:on println
-  }
-
-  /** Returns true if the script exits and the given search string is printed. */
-  private[kyuubi] def testPrematureExit(
-      input: Array[String],
-      searchString: String,
-      mainObject: CommandLineUtils = ControlCli): Unit = {
-    val printStream = new BufferPrintStream()
-    mainObject.printStream = printStream
-
-    @volatile var exitedCleanly = false
-    val original = mainObject.exitFn
-    mainObject.exitFn = (_) => exitedCleanly = true
-    try {
-      @volatile var exception: Exception = null
-      val thread = new Thread {
-        override def run() =
-          try {
-            mainObject.main(input)
-          } catch {
-            // Capture the exception to check whether the exception contains searchString or not
-            case e: Exception => exception = e
-          }
-      }
-      thread.start()
-      thread.join()
-      if (exitedCleanly) {
-        val joined = printStream.lineBuffer.mkString("\n")
-        assert(joined.contains(searchString))
-      } else {
-        assert(exception != null)
-        if (!exception.getMessage.contains(searchString)) {
-          throw exception
-        }
-      }
-    } finally {
-      mainObject.exitFn = original
-    }
-  }
-}
-
 class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
   import DiscoveryClientProvider._
 
@@ -149,7 +91,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       host,
       "--port",
       port)
-    testPrematureExit(args, "Only support expose Kyuubi server instance to another domain")
+    testPrematureExitForControlCli(
+      args,
+      "Only support expose Kyuubi server instance to another domain")
   }
 
   test("test not specified namespace") {
@@ -168,7 +112,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       host,
       "--port",
       port)
-    testPrematureExit(args2, "Only support expose Kyuubi server instance to another domain")
+    testPrematureExitForControlCli(
+      args2,
+      "Only support expose Kyuubi server instance to another domain")
   }
 
   test("test expose to another namespace") {
@@ -190,7 +136,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       host,
       "--port",
       port)
-    testPrematureExit(args, "")
+    testPrematureExitForControlCli(args, "")
   }
 
   test("test render zookeeper service node info") {
@@ -240,7 +186,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
         ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
         ServiceNodeInfo(s"/$newNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
 
-      testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
+      testPrematureExitForControlCli(
+        args,
+        getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
       val znodeRoot = s"/$newNamespace"
       val children = framework.getChildren(znodeRoot).sorted
       assert(children.size == 2)
@@ -264,7 +212,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--namespace",
       namespace)
     val scArgs1 = new ControlCliArguments(arg1)
-    assert(scArgs1.command.getZkNamespace() == s"/$namespace")
+    assert(CtlUtils.getZkNamespace(
+      scArgs1.command.conf,
+      scArgs1.command.normalizedCliConfig) == s"/$namespace")
 
     val arg2 = Array(
       "list",
@@ -276,7 +226,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--user",
       user)
     val scArgs2 = new ControlCliArguments(arg2)
-    assert(scArgs2.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
   }
 
@@ -305,7 +255,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
 
-      testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
+      testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
     }
   }
 
@@ -337,7 +287,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       val expectedNodes = Seq(
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None))
 
-      testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
+      testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, false))
     }
   }
 
@@ -371,7 +321,9 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       val expectedDeletedNodes = Seq(
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None))
 
-      testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
+      testPrematureExitForControlCli(
+        args,
+        getRenderedNodesInfoWithoutTitle(expectedDeletedNodes, false))
     }
   }
 
@@ -401,7 +353,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10000, Some(KYUUBI_VERSION), None),
         ServiceNodeInfo(s"/$uniqueNamespace", "", "localhost", 10001, Some(KYUUBI_VERSION), None))
 
-      testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))
+      testPrematureExitForControlCli(args, getRenderedNodesInfoWithoutTitle(expectedNodes, true))
     }
   }
 
@@ -416,7 +368,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--user",
       user)
     val scArgs1 = new ControlCliArguments(arg1)
-    assert(scArgs1.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
 
     val arg2 = Array(
@@ -431,7 +383,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-type",
       "FLINK_SQL")
     val scArgs2 = new ControlCliArguments(arg2)
-    assert(scArgs2.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_FLINK_SQL/$user/default")
 
     val arg3 = Array(
@@ -446,7 +398,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-type",
       "TRINO")
     val scArgs3 = new ControlCliArguments(arg3)
-    assert(scArgs3.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_TRINO/$user/default")
 
     val arg4 = Array(
@@ -463,7 +415,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-subdomain",
       "sub_1")
     val scArgs4 = new ControlCliArguments(arg4)
-    assert(scArgs4.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/sub_1")
 
     val arg5 = Array(
@@ -482,7 +434,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-subdomain",
       "sub_1")
     val scArgs5 = new ControlCliArguments(arg5)
-    assert(scArgs5.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
       s"/${namespace}_1.5.0_USER_SPARK_SQL/$user/sub_1")
   }
 
@@ -497,7 +449,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--user",
       user)
     val scArgs1 = new ControlCliArguments(arg1)
-    assert(scArgs1.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs1.command.conf, scArgs1.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
 
     val arg2 = Array(
@@ -512,7 +464,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-share-level",
       "CONNECTION")
     val scArgs2 = new ControlCliArguments(arg2)
-    assert(scArgs2.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs2.command.conf, scArgs2.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL/$user/default")
 
     val arg3 = Array(
@@ -527,7 +479,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-share-level",
       "USER")
     val scArgs3 = new ControlCliArguments(arg3)
-    assert(scArgs3.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs3.command.conf, scArgs3.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_USER_SPARK_SQL/$user/default")
 
     val arg4 = Array(
@@ -542,7 +494,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-share-level",
       "GROUP")
     val scArgs4 = new ControlCliArguments(arg4)
-    assert(scArgs4.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs4.command.conf, scArgs4.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_GROUP_SPARK_SQL/$user/default")
 
     val arg5 = Array(
@@ -557,7 +509,7 @@ class ControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
       "--engine-share-level",
       "SERVER")
     val scArgs5 = new ControlCliArguments(arg5)
-    assert(scArgs5.command.getZkNamespace() ==
+    assert(CtlUtils.getZkNamespace(scArgs5.command.conf, scArgs5.command.normalizedCliConfig) ==
       s"/${namespace}_${KYUUBI_VERSION}_SERVER_SPARK_SQL/$user/default")
   }
 }
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala
new file mode 100644
index 000000000..6477b9315
--- /dev/null
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/TestPrematureExit.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ctl
+
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.ctl.util.CommandLineUtils
+
+trait TestPrematureExit {
+  suite: KyuubiFunSuite =>
+
+  private val noOpOutputStream = new OutputStream {
+    def write(b: Int) = {}
+  }
+
+  /** Simple PrintStream that reads data into a buffer */
+  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
+    var lineBuffer = ArrayBuffer[String]()
+    // scalastyle:off println
+    override def println(line: Any): Unit = {
+      lineBuffer += line.toString
+    }
+    // scalastyle:on println
+  }
+
+  /** Returns true if the script exits and the given search string is printed. */
+  private[kyuubi] def testPrematureExitForControlCli(
+      input: Array[String],
+      searchString: String,
+      mainObject: CommandLineUtils = ControlCli): String = {
+    val printStream = new BufferPrintStream()
+    mainObject.printStream = printStream
+
+    @volatile var exitedCleanly = false
+    val original = mainObject.exitFn
+    mainObject.exitFn = (_) => exitedCleanly = true
+    try {
+      @volatile var exception: Exception = null
+      val thread = new Thread {
+        override def run() =
+          try {
+            mainObject.main(input)
+          } catch {
+            // Capture the exception to check whether the exception contains searchString or not
+            case e: Exception => exception = e
+          }
+      }
+      thread.start()
+      thread.join()
+      var joined = ""
+      if (exitedCleanly) {
+        joined = printStream.lineBuffer.mkString("\n")
+        assert(joined.contains(searchString))
+      } else {
+        assert(exception != null)
+        if (!exception.getMessage.contains(searchString)) {
+          throw exception
+        }
+      }
+      joined
+    } finally {
+      mainObject.exitFn = original
+    }
+  }
+
+  def testPrematureExitForControlCliArgs(args: Array[String], searchString: String): Unit = {
+    val logAppender = new LogAppender("test premature exit")
+    withLogAppender(logAppender) {
+      val thread = new Thread {
+        override def run(): Unit =
+          try {
+            new ControlCliArguments(args)
+          } catch {
+            case e: Exception =>
+              error(e)
+          }
+      }
+      thread.start()
+      thread.join()
+      assert(logAppender.loggingEvents.exists(
+        _.getMessage.getFormattedMessage.contains(searchString)))
+    }
+  }
+}
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
index 121579607..a9c064477 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
@@ -60,10 +60,10 @@ public class BatchRestApi {
     params.put("batchType", batchType);
     params.put("batchUser", batchUser);
     params.put("batchState", batchState);
-    if (null != createTime && createTime >= 0) {
+    if (null != createTime && createTime > 0) {
       params.put("createTime", createTime);
     }
-    if (null != endTime && endTime >= 0) {
+    if (null != endTime && endTime > 0) {
       params.put("endTime", endTime);
     }
     params.put("from", from);
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 4e2224e3b..d7ff2d0f7 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -226,6 +226,21 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-ctl_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-ctl_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-download</artifactId>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
index 616b5c669..8317837cb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/config/AllKyuubiConfiguration.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.kyuubi.{KyuubiFunSuite, TestUtils, Utils}
+import org.apache.kyuubi.ctl.CtlConf
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.metrics.MetricsConf
 import org.apache.kyuubi.server.statestore.jdbc.JDBCStateStoreConf
@@ -69,8 +70,9 @@ class AllKyuubiConfiguration extends KyuubiFunSuite {
 
   test("Check all kyuubi configs") {
     KyuubiConf
-    JDBCStateStoreConf
+    CtlConf
     HighAvailabilityConf
+    JDBCStateStoreConf
     MetricsConf
     ZookeeperConf
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
new file mode 100644
index 000000000..de662e5e8
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.rest.client
+
+import java.net.InetAddress
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{RestClientTestHelper, Utils}
+import org.apache.kyuubi.client.api.v1.dto.BatchRequest
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SPARK_MAX_LIFETIME}
+import org.apache.kyuubi.ctl.TestPrematureExit
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.session.KyuubiSessionManager
+
+class BatchCliSuite extends RestClientTestHelper with TestPrematureExit {
+
+  val basePath: String = Utils.getCodeSourceLocation(getClass)
+  val batchFile: String = s"${basePath}/batch.yaml"
+  val appName: String = "test_batch"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    System.setProperty("kyuubi.ctl.rest.base.url", baseUri.toString)
+    System.setProperty("kyuubi.ctl.rest.spnego.host", "localhost")
+
+    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+    val batch_basic = s"""apiVersion: v1
+                         |batchType: Spark
+                         |username: ${ldapUser}
+                         |request:
+                         |  name: ${appName}
+                         |  resource: ${sparkProcessBuilder.mainResource.get}
+                         |  className: ${sparkProcessBuilder.mainClass}
+                         |  args:
+                         |   - x1
+                         |   - x2
+                         |  configs:
+                         |    spark.master: local
+                         |    spark.${ENGINE_SPARK_MAX_LIFETIME.key}: "5000"
+                         |    spark.${ENGINE_CHECK_INTERVAL.key}: "1000"
+                         |    k1: v1
+                         |options:
+                         |  verbose: true""".stripMargin
+    Files.write(Paths.get(batchFile), batch_basic.getBytes(StandardCharsets.UTF_8))
+  }
+
+  override def afterEach(): Unit = {
+    val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+    sessionManager.allSessions().foreach { session =>
+      sessionManager.closeSession(session.handle)
+    }
+    sessionManager.getBatchesFromStateStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
+      batch =>
+        sessionManager.applicationManager.killApplication(None, batch.getId)
+        sessionManager.cleanupMetadata(batch.getId)
+    }
+  }
+
+  test("basic batch rest client") {
+    val createArgs = Array(
+      "create",
+      "batch",
+      "-f",
+      batchFile,
+      "--password",
+      ldapUserPasswd)
+    var result = testPrematureExitForControlCli(createArgs, "")
+    assert(result.contains("Type: SPARK"))
+    assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+    val startIndex = result.indexOf("Id: ") + 4
+    val endIndex = result.indexOf("\n", startIndex)
+    val batchId = result.substring(startIndex, endIndex)
+
+    val getArgs = Array(
+      "get",
+      "batch",
+      batchId,
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd)
+    result = testPrematureExitForControlCli(getArgs, "Type: SPARK")
+    assert(result.contains("Type: SPARK"))
+    assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+
+    val logArgs = Array(
+      "log",
+      "batch",
+      batchId,
+      "--size",
+      "2",
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd)
+    result = testPrematureExitForControlCli(logArgs, "")
+    val rows = result.split("\n")
+    assert(rows.length === 2)
+
+    val deleteArgs = Array(
+      "delete",
+      "batch",
+      batchId,
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd)
+    result = testPrematureExitForControlCli(deleteArgs, "\"success\":true")
+  }
+
+  test("spnego batch rest client") {
+    UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+    val createArgs = Array(
+      "create",
+      "batch",
+      "-f",
+      batchFile,
+      "--authSchema",
+      "SPNEGO")
+    var result = testPrematureExitForControlCli(createArgs, "")
+    assert(result.contains("Type: SPARK"))
+    assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+    val startIndex = result.indexOf("Id: ") + 4
+    val endIndex = result.indexOf("\n", startIndex)
+    val batchId = result.substring(startIndex, endIndex)
+
+    val getArgs = Array(
+      "get",
+      "batch",
+      batchId,
+      "--authSchema",
+      "spnego")
+    result = testPrematureExitForControlCli(getArgs, "Type: SPARK")
+    assert(result.contains("Type: SPARK"))
+    assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+
+    val logArgs = Array(
+      "log",
+      "batch",
+      batchId,
+      "--size",
+      "2",
+      "--authSchema",
+      "spnego")
+    result = testPrematureExitForControlCli(logArgs, "")
+    val rows = result.split("\n")
+    assert(rows.length === 2)
+
+    val deleteArgs = Array(
+      "delete",
+      "batch",
+      batchId,
+      "--authSchema",
+      "spnego")
+    result = testPrematureExitForControlCli(deleteArgs, "\"success\":true")
+  }
+
+  test("log batch test") {
+    val createArgs = Array(
+      "create",
+      "batch",
+      "-f",
+      batchFile,
+      "--password",
+      ldapUserPasswd)
+    val result = testPrematureExitForControlCli(createArgs, "")
+    assert(result.contains("Type: SPARK"))
+    assert(result.contains(s"Kyuubi Instance: ${fe.connectionUrl}"))
+    val startIndex = result.indexOf("Id: ") + 4
+    val endIndex = result.indexOf("\n", startIndex)
+    val batchId = result.substring(startIndex, endIndex)
+
+    val logArgs = Array(
+      "log",
+      "batch",
+      batchId,
+      "--size",
+      "100",
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd,
+      "--forward")
+    testPrematureExitForControlCli(logArgs, s"Submitted application: ${appName}")
+  }
+
+  test("submit batch test") {
+    val submitArgs = Array(
+      "submit",
+      "batch",
+      "-f",
+      batchFile,
+      "--password",
+      ldapUserPasswd)
+    testPrematureExitForControlCli(submitArgs, s"Submitted application: ${appName}")
+  }
+
+  test("list batch test") {
+    val sessionManager = server.frontendServices.head
+      .be.sessionManager.asInstanceOf[KyuubiSessionManager]
+    sessionManager.allSessions().foreach(_.close())
+
+    sessionManager.openBatchSession(
+      "kyuubi",
+      "kyuubi",
+      InetAddress.getLocalHost.getCanonicalHostName,
+      Map.empty,
+      new BatchRequest(
+        "spark",
+        "",
+        "",
+        ""))
+    sessionManager.openSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
+      "",
+      "",
+      "",
+      Map.empty)
+    sessionManager.openSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
+      "",
+      "",
+      "",
+      Map.empty)
+    sessionManager.openBatchSession(
+      "kyuubi",
+      "kyuubi",
+      InetAddress.getLocalHost.getCanonicalHostName,
+      Map.empty,
+      new BatchRequest(
+        "spark",
+        "",
+        "",
+        ""))
+    sessionManager.openBatchSession(
+      "kyuubi",
+      "kyuubi",
+      InetAddress.getLocalHost.getCanonicalHostName,
+      Map.empty,
+      new BatchRequest(
+        "spark",
+        "",
+        "",
+        ""))
+
+    val listArgs = Array(
+      "list",
+      "batch",
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd,
+      "--batchType",
+      "spark",
+      "--batchUser",
+      "kyuubi",
+      "--createTime",
+      "20220101000000")
+    testPrematureExitForControlCli(listArgs, "Total number of batches: 3")
+
+    val listArgs1 = Array(
+      "list",
+      "batch",
+      "--username",
+      ldapUser,
+      "--password",
+      ldapUserPasswd,
+      "--endTime",
+      "20220101000000")
+    testPrematureExitForControlCli(listArgs1, "Total number of batches: 0")
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index a168191b2..4deaefdd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,7 @@
         <scalatestplus.version>3.2.9.0</scalatestplus.version>
         <scopt.version>4.0.1</scopt.version>
         <slf4j.version>1.7.35</slf4j.version>
+        <snakeyaml.version>1.30</snakeyaml.version>
         <!--
           DO NOT forget to change the following properties when change the minor version of Spark:
           `delta.version`, `iceberg.name`, `maven.plugin.scalatest.exclude.tags`