You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/09/06 03:01:23 UTC

[inlong] branch master updated: [INLONG-4972][TubeMQ] Add a command-line tool for TubeMQ (#8835)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ddf19cce1 [INLONG-4972][TubeMQ] Add a command-line tool for TubeMQ  (#8835)
4ddf19cce1 is described below

commit 4ddf19cce1ef132fe812ba63df93a81fced6c85f
Author: Zfancy <47...@users.noreply.github.com>
AuthorDate: Wed Sep 6 11:01:18 2023 +0800

    [INLONG-4972][TubeMQ] Add a command-line tool for TubeMQ  (#8835)
---
 inlong-tubemq/bin/tubectl                          |  40 ++
 inlong-tubemq/bin/tubectl.cmd                      |  31 ++
 inlong-tubemq/tubemq-server/pom.xml                |   4 +
 .../tubemq/server/tools/cli/AbstractCommand.java   |  73 ++++
 .../server/tools/cli/AbstractCommandRunner.java    |  29 ++
 .../tubemq/server/tools/cli/CliWebapiAdmin.java    | 154 +++++++
 .../tubemq/server/tools/cli/CommandToolMain.java   |  72 ++++
 .../server/tools/cli/ConsumerGroupCommand.java     | 167 ++++++++
 .../tubemq/server/tools/cli/MessageCommand.java    | 432 ++++++++++++++++++++
 .../tubemq/server/tools/cli/TopicCommand.java      | 446 +++++++++++++++++++++
 .../server/tools/cli/ConsumerGroupCommandTest.java |  51 +++
 .../server/tools/cli/MessageCommandTest.java       |  68 ++++
 .../tubemq/server/tools/cli/TopicCommandTest.java  |  68 ++++
 licenses/inlong-tubemq-server/LICENSE              |   1 +
 licenses/inlong-tubemq-server/NOTICE               |  12 +
 .../licenses/LICENSE-jcommander.txt                | 202 ++++++++++
 16 files changed, 1850 insertions(+)

diff --git a/inlong-tubemq/bin/tubectl b/inlong-tubemq/bin/tubectl
new file mode 100755
index 0000000000..cb031a9363
--- /dev/null
+++ b/inlong-tubemq/bin/tubectl
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+  BASE_DIR=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  BASE_DIR=`cd "$BASE_DIR" && pwd`
+  #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.inlong.tubemq.server.tools.cli.CommandToolMain $@
diff --git a/inlong-tubemq/bin/tubectl.cmd b/inlong-tubemq/bin/tubectl.cmd
new file mode 100644
index 0000000000..f6c0ebe240
--- /dev/null
+++ b/inlong-tubemq/bin/tubectl.cmd
@@ -0,0 +1,31 @@
+@rem
+@rem Licensed to the Apache Software Foundation (ASF) under one
+@rem or more contributor license agreements.  See the NOTICE file
+@rem distributed with this work for additional information
+@rem regarding copyright ownership.  The ASF licenses this file
+@rem to you under the Apache License, Version 2.0 (the
+@rem "License"); you may not use this file except in compliance
+@rem with the License.  You may obtain a copy of the License at
+@rem
+@rem   http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing,
+@rem software distributed under the License is distributed on an
+@rem "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@rem KIND, either express or implied.  See the License for the
+@rem specific language governing permissions and limitations
+@rem under the License.
+@rem
+
+REM Windows Command-line Tool for TubeMQ
+REM please do not change any command or variable in this script, check out
+REM env.cmd for details.
+
+setlocal
+call "%~dp0env.cmd"
+
+set TUBECTLMAIN=org.apache.inlong.tubemq.server.tools.cli.CommandToolMain
+
+echo on
+call %JAVA% %MASTER_JVM_OPTS% %GENERIC_ARGS% "%TUBECTLMAIN%" %@
+endlocal
\ No newline at end of file
diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml
index b15950fb65..9521857a04 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -187,6 +187,10 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.beust</groupId>
+            <artifactId>jcommander</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java
new file mode 100644
index 0000000000..0bb4d7e3ff
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommand.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+/**
+ * Class for parse command.
+ */
+public abstract class AbstractCommand {
+
+    protected final JCommander jcommander;
+
+    @Parameter(names = {"-h", "--help"}, help = true, hidden = true)
+    private boolean help;
+
+    public AbstractCommand(String cmdName) {
+        jcommander = new JCommander();
+        jcommander.setProgramName("tubectl " + cmdName);
+    }
+
+    public boolean run(String[] args) {
+
+        if (help) {
+            jcommander.usage();
+            return true;
+        }
+
+        try {
+            jcommander.parse(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            jcommander.usage();
+            return false;
+        }
+
+        String cmd = jcommander.getParsedCommand();
+        if (cmd == null) {
+            jcommander.usage();
+            return false;
+        } else {
+            JCommander obj = jcommander.getCommands().get(cmd);
+            AbstractCommandRunner commandRunner = (AbstractCommandRunner) obj.getObjects().get(0);
+            try {
+                commandRunner.run();
+                return true;
+            } catch (ParameterException e) {
+                System.err.println(e.getMessage() + System.lineSeparator());
+                return false;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java
new file mode 100644
index 0000000000..0e4e0d1fe7
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/AbstractCommandRunner.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+/**
+ * The runner of command.
+ */
+public abstract class AbstractCommandRunner {
+
+    /**
+     * Execute the specified command.
+     */
+    abstract void run() throws Exception;
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java
new file mode 100644
index 0000000000..26bfea7108
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliWebapiAdmin.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.server.common.fielddef.CliArgDef;
+import org.apache.inlong.tubemq.server.common.utils.HttpUtils;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is use to process Web Api Request process.
+ */
+public class CliWebapiAdmin extends CliAbstractBase {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(CliBrokerAdmin.class);
+
+    private static final String defMasterPortal = "127.0.0.1:8080";
+
+    private Map<String, Object> requestParams;
+
+    public CliWebapiAdmin() {
+        super(null);
+        initCommandOptions();
+    }
+
+    /**
+     * Construct CliWebapiAdmin with request parameters
+     *
+     * @param requestParams Request parameters map
+     */
+    public CliWebapiAdmin(Map<String, Object> requestParams) {
+        this();
+        this.requestParams = requestParams;
+    }
+
+    /**
+     * Init command options
+     */
+    @Override
+    protected void initCommandOptions() {
+        // add the cli required parameters
+        addCommandOption(CliArgDef.MASTERPORTAL);
+        addCommandOption(CliArgDef.ADMINMETHOD);
+        addCommandOption(CliArgDef.METHOD);
+    }
+
+    /**
+     * Call the Web API
+     *
+     * @param args Request parameters of method name,
+     *             {"--method", "admin_query_topic_info"} as an example
+     */
+    @Override
+    public boolean processParams(String[] args) throws Exception {
+        CommandLine cli = parser.parse(options, args);
+        if (cli == null) {
+            throw new ParseException("Parse args failure");
+        }
+        if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+            version();
+        }
+        if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+            help();
+        }
+        String masterAddr = defMasterPortal;
+        if (cli.hasOption(CliArgDef.MASTERPORTAL.longOpt)) {
+            masterAddr = cli.getOptionValue(CliArgDef.MASTERPORTAL.longOpt);
+            if (TStringUtils.isBlank(masterAddr)) {
+                throw new Exception(CliArgDef.MASTERPORTAL.longOpt + " is required!");
+            }
+        }
+        JsonObject result = null;
+        String masterUrl = "http://" + masterAddr + "/webapi.htm";
+        if (cli.hasOption(CliArgDef.ADMINMETHOD.longOpt)) {
+            Map<String, String> inParamMap = new HashMap<>();
+            inParamMap.put(CliArgDef.METHOD.longOpt, "admin_get_methods");
+            result = HttpUtils.requestWebService(masterUrl, inParamMap);
+            System.out.println(formatResult(result));
+            System.exit(0);
+        }
+        String methodStr = cli.getOptionValue(CliArgDef.METHOD.longOpt);
+        if (TStringUtils.isBlank(methodStr)) {
+            throw new Exception(CliArgDef.METHOD.longOpt + " is required!");
+        }
+        requestParams.put(CliArgDef.METHOD.longOpt, methodStr);
+        Map<String, String> convertedRequestParams = convertRequestParams(requestParams);
+        result = HttpUtils.requestWebService(masterUrl, convertedRequestParams);
+        String formattedResult = formatResult(result);
+        System.out.println(formattedResult);
+        return true;
+    }
+
+    /**
+     * Convert request paramters map
+     *
+     * @param requestParamsMap Map
+     * @return a converted map
+     */
+    private Map<String, String> convertRequestParams(Map<String, Object> requestParamsMap) {
+        // convert object values to string ones
+        Map<String, String> converttedrequestParamsMap = new HashMap<>();
+        for (String k : requestParamsMap.keySet()) {
+            converttedrequestParamsMap.put(k, String.valueOf(requestParamsMap.get(k)));
+        }
+        return converttedrequestParamsMap;
+    }
+
+    /**
+     * Convert json content to specific output format
+     *
+     * @param result JsonObject
+     * @return formatted results
+     */
+    private String formatResult(JsonObject result) {
+        // format output results
+        return new GsonBuilder().setPrettyPrinting().create().toJson(result);
+    }
+
+    public static void main(String[] args) {
+        CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin();
+        try {
+            cliWebapiAdmin.processParams(args);
+        } catch (Throwable ex) {
+            ex.printStackTrace();
+            logger.error(ex.getMessage());
+            cliWebapiAdmin.help();
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java
new file mode 100644
index 0000000000..ba8acaa7cb
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CommandToolMain.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import java.util.Arrays;
+
+/**
+ * Command tool main.
+ */
+public class CommandToolMain {
+
+    private final JCommander jcommander;
+    @Parameter(names = {"-h", "--help"}, help = true, description = "Get all command about tubectl.")
+    boolean help;
+
+    CommandToolMain() {
+        jcommander = new JCommander();
+        jcommander.setProgramName("tubectl");
+        jcommander.addObject(this);
+        jcommander.addCommand("topic", new TopicCommand());
+        jcommander.addCommand("message", new MessageCommand());
+        jcommander.addCommand("group", new ConsumerGroupCommand());
+    }
+
+    boolean run(String[] args) {
+        try {
+            jcommander.parse(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            jcommander.usage();
+            return false;
+        }
+
+        if (help || args.length == 0) {
+            jcommander.usage();
+            return true;
+        }
+
+        String cmd = args[0];
+        JCommander obj = jcommander.getCommands().get(cmd);
+        AbstractCommand cmdObj = (AbstractCommand) obj.getObjects().get(0);
+        return cmdObj.run(Arrays.copyOfRange(args, 1, args.length));
+    }
+
+    public static void main(String[] args) {
+        CommandToolMain tubectlTool = new CommandToolMain();
+        if (tubectlTool.run(args)) {
+            System.exit(0);
+        } else {
+            System.exit(1);
+        }
+    }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java
new file mode 100644
index 0000000000..28ab64909f
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommand.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumer group management
+ */
+@Parameters(commandDescription = "Command for consumer group")
+public class ConsumerGroupCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    final private static String[] requestMethod = new String[]{"--method", ""};
+    final private static Map<String, Object> requestParams = new HashMap<>();
+    final private static CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin(requestParams);
+
+    public ConsumerGroupCommand() {
+        super("group");
+
+        jcommander.addCommand("list", new CgroupList());
+        jcommander.addCommand("create", new CgroupCreate());
+        jcommander.addCommand("delete", new CgroupDelete());
+    }
+
+    @Parameters(commandDescription = "List consumer group")
+    private static class CgroupList extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params = new ArrayList<>();
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-g", "--group"}, order = 1, description = "Consumer group name")
+        private String groupName;
+
+        @Parameter(names = {"-c", "--creator"}, order = 3, description = "Record creator")
+        private String createUser;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_query_allowed_consumer_group_info";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (groupName != null)
+                    requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+                if (createUser != null)
+                    requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Create consumer group")
+    private static class CgroupCreate extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params = new ArrayList<>();
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-g",
+                "--group"}, order = 1, required = true, description = "Consumer group name")
+        private String groupName;
+
+        @Parameter(names = {"-at",
+                "--auth-token"}, order = 2, required = true, description = "Admin api operation authorization code")
+        private String confModAuthToken;
+
+        @Parameter(names = {"-c", "--creator"}, order = 3, required = true, description = "Record creator")
+        private String createUser;
+
+        @Parameter(names = {"-cd", "--create-date"}, order = 4, description = "Record creation date")
+        private String createDate;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_add_authorized_consumergroup_info";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (groupName != null)
+                    requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                if (createUser != null)
+                    requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+                if (createDate != null)
+                    requestParams.put(WebFieldDef.CREATEDATE.name, createDate);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Delete consumer group")
+    private static class CgroupDelete extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params = new ArrayList<>();
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-at",
+                "--auth-token"}, order = 1, required = true, description = "Admin api operation authorization code")
+        private String confModAuthToken;
+
+        @Parameter(names = {"-m", "--modifier"}, required = true, order = 2, description = "Record modifier")
+        private String modifyUser;
+
+        @Parameter(names = {"-g", "--group"}, order = 3, description = "Consumer group name")
+        private String groupName;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_delete_allowed_consumer_group_info";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                if (modifyUser != null)
+                    requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+                if (groupName != null)
+                    requestParams.put(WebFieldDef.GROUPNAME.name, groupName);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java
new file mode 100644
index 0000000000..87534f7cce
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommand.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import org.apache.commons.codec.binary.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Message production and consumption
+ */
+@Parameters(commandDescription = "Command for message production and consumption")
+public class MessageCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    public MessageCommand() {
+        super("message");
+
+        jcommander.addCommand("produce", new MessageProduce());
+        jcommander.addCommand("consume", new MessageConsume());
+    }
+
+    @Parameters(commandDescription = "Produce message")
+    private static class MessageProduce extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"-ms",
+                "--master-servers"}, required = true, order = 1, description = "The master address(es) to connect to. Format is master1_ip:port[,master2_ip:port]")
+        private String masterServers;
+
+        @Parameter(names = {"-t",
+                "--topic"}, required = true, order = 0, description = "Topic to produce messages")
+        private String topicName;
+
+        @Parameter(names = {"-mt",
+                "--msg-total"}, order = 2, description = "The total number of messages to be produced. -1 means unlimited.")
+        private long msgTotal = -1;
+
+        @Parameter(names = {"-m", "--mode"}, order = 3, description = "Produce mode, must in { sync | async }")
+        private String mode = "async";
+
+        private String body = "";
+
+        private MessageProducer messageProducer;
+
+        private AtomicLong msgCount = new AtomicLong(0L);
+
+        /**
+         * Send messages in synchronous mode
+         *
+         * @param message  Message to send
+         * @throws TubeClientException
+         * @throws InterruptedException
+         */
+        private void syncProduce(Message message) throws TubeClientException, InterruptedException {
+            MessageSentResult result = messageProducer.sendMessage(message);
+            if (!result.isSuccess()) {
+                System.out.println("sync send message failed : " + result.getErrMsg());
+            } else {
+                msgCount.getAndIncrement();
+            }
+        }
+
+        /**
+         * Send messages in asynchronous mode
+         *
+         * @param message  Message to send
+         * @throws TubeClientException
+         * @throws InterruptedException
+         */
+        private void asyncProduce(Message message) throws TubeClientException, InterruptedException {
+            messageProducer.sendMessage(message, new MessageSentCallback() {
+
+                @Override
+                public void onMessageSent(MessageSentResult result) {
+                    if (!result.isSuccess()) {
+                        System.out.println("async send message failed : " + result.getErrMsg());
+                    } else {
+                        msgCount.getAndIncrement();
+                    }
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    System.out.println("async send message error : " + e);
+                }
+            });
+        }
+
+        /**
+         * Stop a producer and print the total number of messages produced
+         *
+         * @param v  total number of messages
+         * @throws TubeClientException
+         * @throws InterruptedException
+         */
+        private void stopProducer(long v) {
+            try {
+                messageProducer.shutdown();
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+            System.out.println("\n" + v + " message(s) has been produced. Exited.");
+        }
+
+        @Override
+        void run() {
+            try {
+                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                    if (msgTotal == -1)
+                        stopProducer(msgCount.get());
+                }));
+
+                final TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+                final TubeSingleSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
+                messageProducer = messageSessionFactory.createProducer();
+                messageProducer.publish(topicName);
+                byte[] bodyData;
+                final Message message = new Message(topicName, null);
+
+                BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
+                int c = 0;
+                while (msgTotal == -1 || c < msgTotal) {
+                    System.out.print(">");
+                    body = input.readLine();
+                    if (body == null || "".equals(body) || "".equals(body.trim()))
+                        continue;
+                    bodyData = StringUtils.getBytesUtf8(body);
+                    message.setData(bodyData);
+
+                    switch (mode) {
+                        case "sync":
+                            syncProduce(message);
+                            break;
+                        case "async":
+                            asyncProduce(message);
+                            break;
+                        default:
+                            throw new ParameterException("Produce mode, must in { sync | async }");
+                    }
+                    c++;
+                }
+                stopProducer(msgTotal);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Consume message")
+    private static class MessageConsume extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"-ms",
+                "--master-servers"}, required = true, order = 2, description = "The master address(es) to connect to. Format is master1_ip:port[,master2_ip:port]")
+        private String masterServers;
+
+        @Parameter(names = {"-t",
+                "--topic"}, required = true, order = 0, description = "Topic to consume messages")
+        private String topicName;
+
+        @Parameter(names = {"-g", "--group"}, required = true, order = 1, description = "Consumer group")
+        private String groupName;
+
+        @Parameter(names = {"-m", "--mode"}, order = 5, description = "Consume mode, must in { pull | push }")
+        private String mode = "pull";
+
+        @Parameter(names = {"-p",
+                "--position"}, order = 3, description = "Consume position, must in { first | latest | max }")
+        private String consumePosition = "first";
+
+        @Parameter(names = {"-po",
+                "--partitions-offsets"}, order = 4, description = "Consume partition ids and their offsets, format is id1:offset1[,id2:offset2][...], for example: 0:0,1:0,2:0")
+        private String consumePartitionsAndOffsets;
+
+        private ClientBalanceConsumer clientBalanceConsumer;
+        private PullMessageConsumer messagePullConsumer;
+        private PushMessageConsumer messagePushConsumer;
+
+        private AtomicLong msgCount = new AtomicLong(0L);
+
+        /**
+         * Create a pullConsumer and consume messages
+         *
+         * @param messageSessionFactory
+         * @param consumerConfig
+         * @throws TubeClientException
+         */
+        private void pullConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+                throws TubeClientException {
+            messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
+            messagePullConsumer.subscribe(topicName, null);
+            messagePullConsumer.completeSubscribe();
+            while (!messagePullConsumer.isPartitionsReady(1000)) {
+                ThreadUtils.sleep(1000);
+            }
+            System.out.println("Ready to consume messages......");
+            while (true) {
+                ConsumerResult result = messagePullConsumer.getMessage();
+                if (result.isSuccess()) {
+                    List<Message> messageList = result.getMessageList();
+                    for (Message message : messageList) {
+                        System.out.println(new String(message.getData()));
+                        msgCount.getAndIncrement();
+                    }
+                    messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
+                }
+            }
+        }
+
+        /**
+         * Create a pushConsumer and consume messages
+         *
+         * @param messageSessionFactory
+         * @param consumerConfig
+         * @throws TubeClientException
+         * @throws InterruptedException
+         */
+        private void pushConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+                throws TubeClientException, InterruptedException {
+            messagePushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+            messagePushConsumer.subscribe(topicName, null, new MessageListener() {
+
+                @Override
+                public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
+                    for (Message message : messages) {
+                        System.out.println(new String(message.getData()));
+                        msgCount.getAndIncrement();
+                    }
+                }
+
+                @Override
+                public Executor getExecutor() {
+                    return null;
+                }
+
+                @Override
+                public void stop() {
+                }
+            });
+            messagePushConsumer.completeSubscribe();
+            CountDownLatch latch = new CountDownLatch(1);
+            latch.await(10, TimeUnit.MINUTES);
+        }
+
+        /**
+         * Create a clientBalanceConsumer and consume messages
+         *
+         * @param messageSessionFactory
+         * @param consumerConfig
+         * @throws TubeClientException
+         */
+        private void balanceConsumer(MessageSessionFactory messageSessionFactory, ConsumerConfig consumerConfig)
+                throws TubeClientException {
+            clientBalanceConsumer = messageSessionFactory.createBalanceConsumer(consumerConfig);
+            ProcessResult procResult = new ProcessResult();
+            QueryMetaResult qryResult = new QueryMetaResult();
+            final Map<String, TreeSet<String>> topicAndFiltersMap =
+                    MixedUtils.parseTopicParam(topicName);
+            if (!clientBalanceConsumer.start(topicAndFiltersMap, -1, 0, procResult)) {
+                System.out.println("Initial balance consumer failure, errcode is " + procResult.getErrCode()
+                        + " errMsg is " + procResult.getErrMsg());
+                return;
+            }
+            clientBalanceConsumer.getPartitionMetaInfo(qryResult);
+            Map<String, Boolean> partMetaInfoMap = qryResult.getPartStatusMap();
+            if (partMetaInfoMap != null && !partMetaInfoMap.isEmpty()) {
+                Set<String> configuredTopicPartitions = partMetaInfoMap.keySet();
+                // parse the consumePartitionsAndOffsets parameters
+                Map<Long, Long> assignedPartitionsAndOffsets = new HashMap<>();
+                for (String str : consumePartitionsAndOffsets.split(",")) {
+                    String[] splits = str.split(":");
+                    assignedPartitionsAndOffsets.put(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
+                }
+                Set<Long> assignedPartitionIds = assignedPartitionsAndOffsets.keySet();
+                Set<String> assignedPartitions = new HashSet<>();
+                for (String partKey : configuredTopicPartitions) {
+                    long parId = Long.parseLong(partKey.split(":")[2]);
+                    if (partMetaInfoMap.get(partKey) && assignedPartitionIds.contains(parId)) {
+                        assignedPartitions.add(partKey);
+                        Long boostrapOffset = assignedPartitionsAndOffsets.get(parId);
+                        // connect to the partitions based on consumePartitionsAndOffsets parameters
+                        if (!clientBalanceConsumer.connect2Partition(partKey,
+                                boostrapOffset == null ? -1L : boostrapOffset, procResult))
+                            System.out.println("connect2Partition failed.");
+                    }
+                }
+
+                ConsumeResult csmResult = new ConsumeResult();
+                ConfirmResult cfmResult = new ConfirmResult();
+                while (!clientBalanceConsumer.isPartitionsReady(1000)) {
+                    ThreadUtils.sleep(1000);
+                }
+                System.out.println("Ready to consume messages......");
+                while (true) {
+                    // get messages
+                    if (clientBalanceConsumer.getMessage(csmResult)) {
+                        List<Message> messageList = csmResult.getMessageList();
+                        for (Message message : messageList) {
+                            System.out.println(new String(message.getData()));
+                            msgCount.getAndIncrement();
+                        }
+                        // confirm messages to server
+                        clientBalanceConsumer.confirmConsume(csmResult.getConfirmContext(), true, cfmResult);
+                    }
+                }
+
+            } else {
+                System.out.println("No partitions of the topic are available now.");
+            }
+
+        }
+
+        @Override
+        void run() {
+            try {
+                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                    try {
+                        if (clientBalanceConsumer != null)
+                            clientBalanceConsumer.shutdown();
+                        if (messagePullConsumer != null)
+                            messagePullConsumer.shutdown();
+                        if (messagePushConsumer != null)
+                            messagePushConsumer.shutdown();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                    System.out.println(msgCount.get() + " message(s) has been consumed. Exited.");
+                }));
+
+                final ConsumerConfig consumerConfig = new ConsumerConfig(masterServers, groupName);
+                switch (consumePosition) {
+                    case "first":
+                        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+                        break;
+                    case "latest":
+                        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+                        break;
+                    case "max":
+                        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS);
+                        break;
+                    default:
+                        throw new ParameterException("Consume position, must in { first | latest | max }");
+                }
+                final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+                if (consumePartitionsAndOffsets != null) {
+                    balanceConsumer(messageSessionFactory, consumerConfig);
+                } else {
+                    switch (mode) {
+                        case "pull":
+                            pullConsumer(messageSessionFactory, consumerConfig);
+                            break;
+                        case "push":
+                            pushConsumer(messageSessionFactory, consumerConfig);
+                            break;
+                        case "balance":
+                            balanceConsumer(messageSessionFactory, consumerConfig);
+                            break;
+                        default:
+                            throw new ParameterException("Consume mode, must in { pull | push | balance }");
+                    }
+                }
+
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java
new file mode 100644
index 0000000000..c807d4b80c
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommand.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Topic management
+ */
+@Parameters(commandDescription = "Command for topic management")
+public class TopicCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    final private static String[] requestMethod = new String[]{"--method", ""};
+
+    final private static Map<String, Object> requestParams = new HashMap<>();
+
+    final private static CliWebapiAdmin cliWebapiAdmin = new CliWebapiAdmin(requestParams);
+
+    public TopicCommand() {
+        super("topic");
+
+        jcommander.addCommand("list", new TopicList());
+        jcommander.addCommand("update", new TopicUpdate());
+        jcommander.addCommand("create", new TopicCreate());
+        jcommander.addCommand("delete", new TopicDelete());
+    }
+
+    @Parameters(commandDescription = "List topic")
+    private static class TopicList extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-sid", "--topic-status-id"}, order = 1, description = "Topic status ID")
+        private int topicStatusId = 0;
+
+        @Parameter(names = {"-bid", "--broker-id"}, order = 2, description = "Brokers' ID, separated by commas")
+        private String brokerId;
+
+        @Parameter(names = {"-dp", "--delete-policy"}, order = 3, description = "File aging strategy")
+        private String deletePolicy;
+
+        @Parameter(names = {"-np", "--num-partitions"}, order = 4, description = "Number of partitions")
+        private int numPartitions = 3;
+
+        @Parameter(names = {"-nts", "--num-topic-stores"}, order = 5, description = "Number of topic stores")
+        private int numTopicStores = 1;
+
+        @Parameter(names = {"-uft",
+                "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+        private int unflushThreshold = 1000;
+
+        @Parameter(names = {"-ufi",
+                "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+        private int unflushInterval = 10000;
+
+        @Parameter(names = {"-ufd",
+                "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+        private int unflushDataHold = 0;
+
+        @Parameter(names = {"-mc",
+                "--memcache-msgcnt-ink"}, order = 9, description = "Maximum allowed memory cache unflushing message count")
+        private int memCacheMsgCntInK = 10;
+
+        @Parameter(names = {"-ms",
+                "--memcache-msgsize-inmb"}, order = 10, description = "Maximum allowed memory cache size in MB")
+        private int memCacheMsgSizeInMB = 2;
+
+        @Parameter(names = {"-mfi",
+                "--memcache-flush-intvl"}, order = 11, description = "Maximum allowed disk unflushing data size")
+        private int memCacheFlushIntvl = 20000;
+
+        @Parameter(names = {"-c", "--creator"}, order = 12, description = "Record creator")
+        private String createUser;
+
+        @Parameter(names = {"-m", "--modifier"}, order = 13, description = "Record modifier")
+        private String modifyUser;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_query_topic_info";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                requestParams.put(WebFieldDef.TOPICSTATUSID.name, topicStatusId);
+                if (brokerId != null)
+                    requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+                if (deletePolicy != null)
+                    requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+                requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+                requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+                requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+                requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+                requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+                requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+                requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+                requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+                if (createUser != null)
+                    requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+                if (modifyUser != null)
+                    requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Update topic")
+    private static class TopicUpdate extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-bid",
+                "--broker-id"}, order = 1, required = true, description = "Brokers' ID, separated by commas")
+        private String brokerId;
+
+        @Parameter(names = {"-dp", "--delete-policy"}, order = 4, description = "File aging strategy")
+        private String deletePolicy;
+
+        @Parameter(names = {"-np", "--num-partitions"}, order = 5, description = "Number of partitions")
+        private int numPartitions = 3;
+
+        @Parameter(names = {"-uft",
+                "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+        private int unflushThreshold = 1000;
+
+        @Parameter(names = {"-ufi",
+                "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+        private int unflushInterval = 10000;
+
+        @Parameter(names = {"-ufd",
+                "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+        private int unflushDataHold = 0;
+
+        @Parameter(names = {"-nts", "--num-topic-stores"}, order = 9, description = "Number of topic stores")
+        private int numTopicStores = 1;
+
+        @Parameter(names = {"-mc",
+                "--memcache-msgcnt-ink"}, order = 10, description = "Maximum allowed memory cache unflushing message count")
+        private int memCacheMsgCntInK = 10;
+
+        @Parameter(names = {"-ms",
+                "--memcache-msgsize-inmb"}, order = 11, description = "Maximum allowed memory cache size in MB")
+        private int memCacheMsgSizeInMB = 2;
+
+        @Parameter(names = {"-mfi",
+                "--memcache-flush-intvl"}, order = 12, description = "Maximum allowed disk unflushing data size")
+        private int memCacheFlushIntvl = 20000;
+
+        @Parameter(names = {"-ap", "--accept-publish"}, order = 13, description = "Enable publishing")
+        private boolean acceptPublish = true;
+
+        @Parameter(names = {"-as", "--accept-subscribe"}, order = 14, description = "Enable subscription")
+        private boolean acceptSubscribe = true;
+
+        @Parameter(names = {"-mms",
+                "--max-msgsize-inmb"}, order = 15, description = "Maximum allowed message length, unit MB")
+        private int maxMsgSizeInMB = 1;
+
+        @Parameter(names = {"-m", "--modifier"}, order = 2, required = true, description = "Record modifier")
+        private String modifyUser;
+
+        @Parameter(names = {"-md", "--modify-date"}, order = 16, description = "Record modification date")
+        private String modifyDate;
+
+        @Parameter(names = {"-at",
+                "--auth-token"}, order = 3, required = true, description = "Admin api operation authorization code")
+        private String confModAuthToken;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_modify_topic_info";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (brokerId != null)
+                    requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+                if (deletePolicy != null)
+                    requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+                requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+                requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+                requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+                requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+                requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+                requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+                requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+                requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+                requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, acceptPublish);
+                requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, acceptSubscribe);
+                requestParams.put(WebFieldDef.MAXMSGSIZEINMB.name, maxMsgSizeInMB);
+                if (modifyUser != null)
+                    requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+                if (modifyDate != null)
+                    requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                cliWebapiAdmin.processParams(requestMethod);
+
+                System.out.println("Reloading broker configure...");
+                requestParams.clear();
+                requestMethod[1] = "admin_reload_broker_configure";
+                if (brokerId != null)
+                    requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+                if (modifyUser != null)
+                    requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+                if (modifyDate != null)
+                    requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Create topic")
+    private static class TopicCreate extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params = new ArrayList<>();
+
+        @Parameter(names = {"-t", "--topic"}, order = 0, required = true, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-bid",
+                "--broker-id"}, order = 1, required = true, description = "Brokers' ID, separated by commas")
+        private String brokerId;
+
+        @Parameter(names = {"-dp", "--delete-policy"}, order = 4, description = "File aging strategy")
+        private String deletePolicy;
+
+        @Parameter(names = {"-np", "--num-partitions"}, order = 5, description = "Number of partitions")
+        private int numPartitions = -1;
+
+        @Parameter(names = {"-uft",
+                "--unflush-threshold"}, order = 6, description = "Maximum allowed disk unflushing message count")
+        private int unflushThreshold = -1;
+
+        @Parameter(names = {"-ufi",
+                "--unflush-interval"}, order = 7, description = "Maximum allowed disk unflushing interval")
+        private int unflushInterval = -1;
+
+        @Parameter(names = {"-ufd",
+                "--unflush-datahold"}, order = 8, description = "Maximum allowed disk unflushing data size")
+        private int unflushDataHold = 0;
+
+        @Parameter(names = {"-nts", "--num-topic-stores"}, order = 9, description = "Number of topic stores")
+        private int numTopicStores = 1;
+
+        @Parameter(names = {"-mc",
+                "--memcache-msgcnt-ink"}, order = 10, description = "Maximum allowed memory cache unflushing message count")
+        private int memCacheMsgCntInK = 10;
+
+        @Parameter(names = {"-ms",
+                "--memcache-msgsize-inmb"}, order = 11, description = "Maximum allowed memory cache size in MB")
+        private int memCacheMsgSizeInMB = 2;
+
+        @Parameter(names = {"-mfi",
+                "--memcache-flush-intvl"}, order = 12, description = "Maximum allowed disk unflushing data size")
+        private int memCacheFlushIntvl = 20000;
+
+        @Parameter(names = {"-ap", "--accept-publish"}, order = 13, description = "Enable publishing")
+        private boolean acceptPublish = true;
+
+        @Parameter(names = {"-as", "--accept-subscribe"}, order = 14, description = "Enable subscription")
+        private boolean acceptSubscribe = true;
+
+        @Parameter(names = {"-mms",
+                "--max-msgsize-inmb"}, order = 15, description = "Maximum allowed message length, unit MB")
+        private int maxMsgSizeInMB = 1;
+
+        @Parameter(names = {"-c", "--creator"}, order = 2, required = true, description = "Record creator")
+        private String createUser;
+
+        @Parameter(names = {"-cd", "--create-date"}, order = 16, description = "Record creation date")
+        private String createDate;
+
+        @Parameter(names = {"-at",
+                "--auth-token"}, order = 3, required = true, description = "Admin api operation authorization code")
+        private String confModAuthToken;
+
+        @Override
+        void run() {
+            try {
+                requestMethod[1] = "admin_add_new_topic_record";
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (brokerId != null)
+                    requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+                if (deletePolicy != null)
+                    requestParams.put(WebFieldDef.DELETEPOLICY.name, deletePolicy);
+                if (numPartitions != -1)
+                    requestParams.put(WebFieldDef.NUMPARTITIONS.name, numPartitions);
+                if (unflushThreshold != -1)
+                    requestParams.put(WebFieldDef.UNFLUSHTHRESHOLD.name, unflushThreshold);
+                if (unflushInterval != -1)
+                    requestParams.put(WebFieldDef.UNFLUSHINTERVAL.name, unflushInterval);
+                requestParams.put(WebFieldDef.UNFLUSHDATAHOLD.name, unflushDataHold);
+                requestParams.put(WebFieldDef.NUMTOPICSTORES.name, numTopicStores);
+                requestParams.put(WebFieldDef.UNFMCACHECNTINK.name, memCacheMsgCntInK);
+                requestParams.put(WebFieldDef.MCACHESIZEINMB.name, memCacheMsgSizeInMB);
+                requestParams.put(WebFieldDef.UNFMCACHEINTERVAL.name, memCacheFlushIntvl);
+                requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, acceptPublish);
+                requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, acceptSubscribe);
+                requestParams.put(WebFieldDef.MAXMSGSIZEINMB.name, maxMsgSizeInMB);
+                if (createUser != null)
+                    requestParams.put(WebFieldDef.CREATEUSER.name, createUser);
+                if (createDate != null)
+                    requestParams.put(WebFieldDef.CREATEDATE.name, createDate);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                cliWebapiAdmin.processParams(requestMethod);
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Delete topic")
+    private static class TopicDelete extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params = new ArrayList<>();
+
+        @Parameter(names = {"-o",
+                "--delete-opt"}, order = 0, description = "Delete options, must in { soft | redo | hard }")
+        private String deleteOpt = "soft";
+
+        @Parameter(names = {"-t", "--topic"}, order = 1, required = true, description = "Topic name")
+        private String topicName;
+
+        @Parameter(names = {"-bid",
+                "--broker-id"}, order = 2, required = true, description = "Brokers' ID, separated by commas")
+        private String brokerId;
+
+        @Parameter(names = {"-m", "--modifier"}, order = 3, required = true, description = "Record modifier")
+        private String modifyUser;
+
+        @Parameter(names = {"-md", "--modify-date"}, order = 5, description = "Record modification date")
+        private String modifyDate;
+
+        @Parameter(names = {"-at",
+                "--auth-token"}, order = 4, required = true, description = "Admin api operation authorization code")
+        private String confModAuthToken;
+
+        private void softDelete() throws Exception {
+            System.out.println("Turning publish and subscribe status to false...");
+            requestMethod[1] = "admin_modify_topic_info";
+            requestParams.put(WebFieldDef.ACCEPTPUBLISH.name, false);
+            requestParams.put(WebFieldDef.ACCEPTSUBSCRIBE.name, false);
+            cliWebapiAdmin.processParams(requestMethod);
+            requestParams.remove(WebFieldDef.ACCEPTPUBLISH.name);
+            requestParams.remove(WebFieldDef.ACCEPTSUBSCRIBE.name);
+
+            System.out.println("Beginning to soft delete...");
+            requestMethod[1] = "admin_delete_topic_info";
+            cliWebapiAdmin.processParams(requestMethod);
+        }
+
+        private void redoDelete() throws Exception {
+            requestMethod[1] = "admin_redo_deleted_topic_info";
+            cliWebapiAdmin.processParams(requestMethod);
+        }
+
+        private void hardDelete() throws Exception {
+            softDelete();
+
+            System.out.println("Beginning to hard delete...");
+            requestMethod[1] = "admin_remove_topic_info";
+            cliWebapiAdmin.processParams(requestMethod);
+        }
+
+        @Override
+        void run() {
+            try {
+                requestParams.clear();
+                if (topicName != null)
+                    requestParams.put(WebFieldDef.TOPICNAME.name, topicName);
+                if (brokerId != null)
+                    requestParams.put(WebFieldDef.BROKERID.name, brokerId);
+                if (modifyUser != null)
+                    requestParams.put(WebFieldDef.MODIFYUSER.name, modifyUser);
+                if (modifyDate != null)
+                    requestParams.put(WebFieldDef.MODIFYDATE.name, modifyDate);
+                if (confModAuthToken != null)
+                    requestParams.put(WebFieldDef.ADMINAUTHTOKEN.name, confModAuthToken);
+                switch (deleteOpt) {
+                    case "soft":
+                        softDelete();
+                        break;
+                    case "redo":
+                        redoDelete();
+                        break;
+                    case "hard":
+                        hardDelete();
+                        break;
+                    default:
+                        throw new ParameterException("delete option must in { soft | redo | hard }");
+                }
+            } catch (Exception e) {
+                System.out.println(e.getMessage());
+            }
+        }
+
+    }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java
new file mode 100644
index 0000000000..c4f2603074
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/ConsumerGroupCommandTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerGroupCommandTest {
+
+    CommandToolMain tubectlTool = null;
+
+    @Before
+    public void setUp() {
+        tubectlTool = new CommandToolMain();
+    }
+
+    @Test
+    public void testConsumerGroupCreate() {
+        String[] arg = {"group", "create", "-t", "b4t1", "-g", "b4t1g1", "-at", "abc", "-c", "admin", "-cd",
+                "20151117151129"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testConsumerGroupList() {
+        String[] arg = {"group", "list", "-t", "b4t1", "-g", "b4t1g1", "-c", "admin"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testConsumerGroupDelete() {
+        String[] arg = {"group", "delete", "-t", "b4t1", "-at", "abc", "-m", "admin", "-g", "b4t1g1"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java
new file mode 100644
index 0000000000..a43ed58a22
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/MessageCommandTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class MessageCommandTest {
+
+    CommandToolMain tubectlTool = null;
+
+    @Before
+    public void setUp() {
+        tubectlTool = new CommandToolMain();
+    }
+
+    @Test
+    public void testMessageProduceSync() throws UnknownHostException, InterruptedException {
+        InetAddress addr = InetAddress.getLocalHost();
+        int port = 8715;
+        String masterservers = addr.getHostAddress() + ":" + String.valueOf(port);
+
+        String messageBody = "This is a message from testMessageProduceSync.";
+        InputStream in = new ByteArrayInputStream(messageBody.getBytes());
+        System.setIn(in);
+
+        String[] arg = {"message", "produce", "-ms", masterservers, "-t", "b4t4", "-m", "sync", "-mt", "1"};
+        Assert.assertTrue(tubectlTool.run(arg));
+
+    }
+
+    @Test
+    public void testMessageProduceAsync() throws UnknownHostException, InterruptedException {
+        InetAddress addr = InetAddress.getLocalHost();
+        int port = 8715;
+        String masterservers = addr.getHostAddress() + ":" + String.valueOf(port);
+
+        String messageBody = "This is a message from testMessageProduceAsync.";
+        InputStream in = new ByteArrayInputStream(messageBody.getBytes());
+        System.setIn(in);
+
+        String[] arg = {"message", "produce", "-ms", masterservers, "-t", "b4t4", "-m", "async", "-mt", "1"};
+        Assert.assertTrue(tubectlTool.run(arg));
+
+    }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java
new file mode 100644
index 0000000000..b730359f82
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/tools/cli/TopicCommandTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.tools.cli;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TopicCommandTest {
+
+    CommandToolMain tubectlTool = null;
+
+    @Before
+    public void setUp() {
+        tubectlTool = new CommandToolMain();
+    }
+
+    @Test
+    public void testTopicCreate() {
+        String[] arg = {"topic", "create", "-t", "b4t1", "-bid", "4", "-c", "admin", "-at", "abc"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testTopicList() {
+        String[] arg = {"topic", "list"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testTopicUpdate() {
+        String[] arg = {"topic", "update", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testTopicDeleteSoft() {
+        String[] arg = {"topic", "delete", "-o", "soft", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testTopicDeleteRedo() {
+        String[] arg = {"topic", "delete", "-o", "redo", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+
+    @Test
+    public void testTopicDeleteHard() {
+        String[] arg = {"topic", "delete", "-o", "hard", "-t", "b4t1", "-bid", "4", "-m", "admin", "-at", "abc"};
+        Assert.assertTrue(tubectlTool.run(arg));
+    }
+}
diff --git a/licenses/inlong-tubemq-server/LICENSE b/licenses/inlong-tubemq-server/LICENSE
index b5418502d8..abdf55808b 100644
--- a/licenses/inlong-tubemq-server/LICENSE
+++ b/licenses/inlong-tubemq-server/LICENSE
@@ -418,6 +418,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
 
   log4j:log4j:1.2.17 - Apache Log4j (http://logging.apache.org/log4j/1.2/), (The Apache Software License, Version 2.0)
   org.apache.logging.log4j:log4j-core:2.17.2 - Apache Log4j Core (https://logging.apache.org/log4j/2.x/log4j-core/), (Apache License, Version 2.0)
+  com.beust:jcommander:1.78 - jcommander (https://github.com/cbeust/jcommander/tree/1.78), (Apache License, Version 2.0)
   org.eclipse.jetty:jetty-http:9.4.48.v20220622 - Jetty :: Http Utility (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
   org.eclipse.jetty:jetty-io:9.4.48.v20220622 - Jetty :: IO Utility (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
   org.eclipse.jetty:jetty-security:9.4.48.v20220622 - Jetty :: Security (http://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
diff --git a/licenses/inlong-tubemq-server/NOTICE b/licenses/inlong-tubemq-server/NOTICE
index c5ea4b97d6..bec5a8bbe9 100644
--- a/licenses/inlong-tubemq-server/NOTICE
+++ b/licenses/inlong-tubemq-server/NOTICE
@@ -215,6 +215,18 @@ limitations under the License.
 
 
 
+========================================================================
+
+jcommander NOTICE
+========================================================================
+
+JCommander Copyright Notices
+============================
+
+Copyright 2010 Cedric Beust <ce...@beust.com>
+
+
+
 ========================================================================
 
 Apache Log4j NOTICE
diff --git a/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt b/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt
new file mode 100644
index 0000000000..477eb7b7ba
--- /dev/null
+++ b/licenses/inlong-tubemq-server/licenses/LICENSE-jcommander.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright 2012, Cedric Beust
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.