You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:07 UTC

[incubator-tubemq] 09/49: [TUBEMQ-444]Add consume and produce Cli commands (#343)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 6ce60a894b1bed29d8bd1602d936cf9c32581b77
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Dec 9 15:38:34 2020 +0800

    [TUBEMQ-444]Add consume and produce Cli commands (#343)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 ...nsumer-perf-test.sh => tubemq-consumer-test.sh} |   2 +-
 ...oducer-perf-test.sh => tubemq-producer-test.sh} |   2 +-
 .../apache/tubemq/corebase/TErrCodeConstants.java  |   6 +
 .../apache/tubemq/corebase/utils/MixedUtils.java   |  59 +++
 tubemq-example/pom.xml                             |  10 -
 tubemq-example/src/main/assembly/assembly.xml      |   3 -
 .../apache/tubemq/example/ArgsParserHelper.java    |  48 ---
 .../tubemq/example/MAMessageProducerExample.java   | 100 ++----
 .../tubemq/example/MessageConsumerExample.java     | 125 +++----
 .../tubemq/server/common/fielddef/CliArgDef.java   |  70 ++--
 .../tubemq/server/tools/cli/CliAbstractBase.java   |  76 ++++
 .../tubemq/server/tools/cli/CliConsumer.java       | 394 +++++++++++++++++++++
 .../tubemq/server/tools/cli/CliProducer.java       | 385 ++++++++++++++++++++
 13 files changed, 1037 insertions(+), 243 deletions(-)

diff --git a/bin/tubemq-consumer-perf-test.sh b/bin/tubemq-consumer-test.sh
similarity index 94%
rename from bin/tubemq-consumer-perf-test.sh
rename to bin/tubemq-consumer-test.sh
index 54189a2..23292fb 100644
--- a/bin/tubemq-consumer-perf-test.sh
+++ b/bin/tubemq-consumer-test.sh
@@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then
   #echo "TubeMQ master is at $BASE_DIR"
 fi
 source $BASE_DIR/bin/env.sh
-$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@
+$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliConsumer $@
diff --git a/bin/tubemq-producer-perf-test.sh b/bin/tubemq-producer-test.sh
similarity index 94%
rename from bin/tubemq-producer-perf-test.sh
rename to bin/tubemq-producer-test.sh
index ed76c3a..0f96033 100644
--- a/bin/tubemq-producer-perf-test.sh
+++ b/bin/tubemq-producer-test.sh
@@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then
   #echo "TubeMQ master is at $BASE_DIR"
 fi
 source $BASE_DIR/bin/env.sh
-$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@
+$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliProducer $@
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
index 441af38..8bdafa0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
@@ -18,6 +18,9 @@
 package org.apache.tubemq.corebase;
 
 
+import java.util.Arrays;
+import java.util.List;
+
 public class TErrCodeConstants {
     public static final int SUCCESS = 200;
     public static final int NOT_READY = 201;
@@ -44,4 +47,7 @@ public class TErrCodeConstants {
     public static final int SERVICE_UNAVAILABLE = 503;
     public static final int INTERNAL_SERVER_ERROR_MSGSET_NULL = 510;
 
+    public static final List<Integer> IGNORE_ERROR_SET =
+            Arrays.asList(BAD_REQUEST, NOT_FOUND, ALL_PARTITION_FROZEN,
+                    NO_PARTITION_ASSIGNED, ALL_PARTITION_WAITING, ALL_PARTITION_INUSE);
 }
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index 17ccb5e..bfbedad 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -17,6 +17,16 @@
 
 package org.apache.tubemq.corebase.utils;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.tubemq.corebase.TokenConstants;
+
+
+
 public class MixedUtils {
     // java version cache
     private static String javaVersion = "";
@@ -33,4 +43,53 @@ public class MixedUtils {
             return javaVersion.substring(0, maxLen);
         }
     }
+
+    /**
+     * parse topic Parameter with format topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]
+     *  topicParam->set(filterCond) map
+     * @param topicParam - composite string
+     * @return - map of topic->set(filterCond)
+     */
+    public static Map<String, TreeSet<String>> parseTopicParam(String topicParam) {
+        Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+        if (TStringUtils.isBlank(topicParam)) {
+            return topicAndFiltersMap;
+        }
+        String[] topicFilterStrs = topicParam.split(TokenConstants.ARRAY_SEP);
+        for (String topicFilterStr : topicFilterStrs) {
+            if (TStringUtils.isBlank(topicFilterStr)) {
+                continue;
+            }
+            String[] topicFilters = topicFilterStr.split(TokenConstants.ATTR_SEP);
+            if (TStringUtils.isBlank(topicFilters[0])) {
+                continue;
+            }
+            TreeSet<String> filterSet = new TreeSet<>();
+            if (topicFilters.length > 1
+                    && TStringUtils.isNotBlank(topicFilters[1])) {
+                String[] filterItems = topicFilters[1].split(TokenConstants.LOG_SEG_SEP);
+                for (String filterItem : filterItems) {
+                    if (TStringUtils.isBlank(filterItem)) {
+                        continue;
+                    }
+                    filterSet.add(filterItem.trim());
+                }
+            }
+            topicAndFiltersMap.put(topicFilters[0].trim(), filterSet);
+        }
+        return topicAndFiltersMap;
+    }
+
+    public static byte[] buildTestData(int bodySize) {
+        final byte[] transmitData =
+                StringUtils.getBytesUtf8("This is a test data!");
+        final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize);
+        while (dataBuffer.hasRemaining()) {
+            int offset = dataBuffer.arrayOffset();
+            dataBuffer.put(transmitData, offset,
+                    Math.min(dataBuffer.remaining(), transmitData.length));
+        }
+        dataBuffer.flip();
+        return dataBuffer.array();
+    }
 }
diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml
index 319acab..9ab6775 100644
--- a/tubemq-example/pom.xml
+++ b/tubemq-example/pom.xml
@@ -65,16 +65,6 @@
             <groupId>org.apache.tubemq</groupId>
             <artifactId>tubemq-client</artifactId>
         </dependency>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
 </project>
diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml
index 596af24..11edd32 100644
--- a/tubemq-example/src/main/assembly/assembly.xml
+++ b/tubemq-example/src/main/assembly/assembly.xml
@@ -32,9 +32,6 @@
             <directory>../</directory>
             <includes>
                 <include>./conf/tools.log4j.properties</include>
-                <include>./bin/tubemq-consumer-perf-test.sh</include>
-                <include>./bin/tubemq-producer-perf-test.sh</include>
-                <include>./bin/env.sh</include>
                 <include>LICENSE</include>
                 <include>NOTICE</include>
                 <include>DISCLAIMER-WIP</include>
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
deleted file mode 100644
index c507ae4..0000000
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.example;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-public class ArgsParserHelper {
-
-    /**
-     * Print help information and exit.
-     *
-     * @param opts - options
-     */
-    public static void help(String commandName, Options opts) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp(commandName, opts);
-        System.exit(0);
-    }
-
-    /**
-     * Init common options when parsing args.
-     * @return - options
-     */
-    public static Options initCommonOptions() {
-        Options options = new Options();
-        options.addOption(null, "help", false, "show help");
-        options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000");
-        options.addOption(null, "topic", true, "topic list, topic1,topic2 or "
-                + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
-        return options;
-    }
-}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index 9c76ce1..f76b077 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -30,13 +30,8 @@ import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.tubemq.client.config.TubeClientConfig;
 import org.apache.tubemq.client.exception.TubeClientException;
@@ -64,7 +59,7 @@ public class MAMessageProducerExample {
     private static final int SESSION_FACTORY_NUM = 10;
 
     private static Set<String> topicSet;
-    private static int batchCount;
+    private static int msgCount;
     private static int producerCount;
     private static byte[] sendData;
 
@@ -94,72 +89,45 @@ public class MAMessageProducerExample {
         }
     }
 
-    /**
-     * Init options
-     *
-     * @return options
-     */
-    public static Options initOptions() {
-        Options options = ArgsParserHelper.initCommonOptions();
-        options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000");
-        options.addOption(null, "max-batch", true, "max batch number, default is 1024");
-        options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100");
-        return options;
-    }
-
     public static void main(String[] args) {
-        Options options = null;
-        try {
-            CommandLineParser parser = new DefaultParser();
-            options = initOptions();
-            CommandLine cl = parser.parse(options, args);
-            if (cl != null) {
-                final String masterHostAndPort = cl.getOptionValue("master-list");
-                final String topics = cl.getOptionValue("topic");
-                final List<String> topicList = Arrays.asList(topics.split(","));
-                topicSet = new TreeSet<>(topicList);
-
-                batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000"));
-                int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024"));
-                producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
-                        "thread-num", "1")), MAX_PRODUCER_NUM);
-                logger.info("MAMessageProducerExample.main started...");
-                final byte[] transmitData = StringUtils
-                        .getBytesUtf8("This is a test message from multi-session factory.");
-                final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize);
-
-                while (dataBuffer.hasRemaining()) {
-                    int offset = dataBuffer.arrayOffset();
-                    dataBuffer.put(transmitData, offset,
-                            Math.min(dataBuffer.remaining(), transmitData.length));
-                }
+        final String masterHostAndPort = args[0];
 
-                dataBuffer.flip();
-                sendData = dataBuffer.array();
+        final String topics = args[1];
+        final List<String> topicList = Arrays.asList(topics.split(","));
 
-                try {
-                    MAMessageProducerExample messageProducer = new MAMessageProducerExample(
-                            masterHostAndPort);
+        topicSet = new TreeSet<>(topicList);
 
-                    messageProducer.startService();
+        msgCount = Integer.parseInt(args[2]);
+        producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
 
-                    while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) {
-                        TimeUnit.MILLISECONDS.sleep(1000);
-                    }
-                    messageProducer.producerMap.clear();
-                    messageProducer.shutdown();
+        logger.info("MAMessageProducerExample.main started...");
 
-                } catch (TubeClientException e) {
-                    logger.error("TubeClientException: ", e);
-                } catch (Throwable e) {
-                    logger.error("Throwable: ", e);
-                }
-            }
-        } catch (Exception ex) {
-            logger.error(ex.getMessage());
-            if (options != null) {
-                ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options);
+        final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
+        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
+
+        while (dataBuffer.hasRemaining()) {
+            int offset = dataBuffer.arrayOffset();
+            dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
+        }
+
+        dataBuffer.flip();
+        sendData = dataBuffer.array();
+
+        try {
+            MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
+
+            messageProducer.startService();
+
+            while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
+                Thread.sleep(1000);
             }
+            messageProducer.producerMap.clear();
+            messageProducer.shutdown();
+
+        } catch (TubeClientException e) {
+            logger.error("TubeClientException: ", e);
+        } catch (Throwable e) {
+            logger.error("Throwable: ", e);
         }
     }
 
@@ -205,7 +173,7 @@ public class MAMessageProducerExample {
             } catch (Throwable t) {
                 logger.error("publish exception: ", t);
             }
-            for (int i = 0; i < batchCount; i++) {
+            for (int i = 0; i < msgCount; i++) {
                 long millis = System.currentTimeMillis();
                 for (String topic : topicSet) {
                     try {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index 3b99943..d9aeb8a 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -26,10 +26,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
 import org.apache.tubemq.client.common.PeerInfo;
 import org.apache.tubemq.client.config.ConsumerConfig;
 import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -63,45 +59,29 @@ public final class MessageConsumerExample {
     private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
 
     private final PushMessageConsumer messageConsumer;
+    private final MessageSessionFactory messageSessionFactory;
 
-    public MessageConsumerExample(String masterHostAndPort, String group,
-            int fetchCount, boolean isFromBegin) throws Exception {
+    public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        if (isFromBegin) {
-            consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
-        } else {
-            consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        }
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         if (fetchCount > 0) {
             consumerConfig.setPushFetchThreadCnt(fetchCount);
         }
-        MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
     }
 
-    /**
-     * Init options
-     * @return options
-     */
-    public static Options initOptions() {
-
-        Options options = ArgsParserHelper.initCommonOptions();
-        options.addOption(null, "batch-size", true, "max number of fetching message in one batch");
-        options.addOption(null, "thread-num", true, "thread number of consumers");
-        options.addOption(null, "group", true, "consumer group");
-        options.addOption(null, "from-begin", false, "default is consuming from latest, "
-                + "if option is clarified, then consume from begin");
-        return options;
-
-    }
+    public static void main(String[] args) {
+        final String masterHostAndPort = args[0];
+        final String topics = args[1];
+        final String group = args[2];
+        final int consumerCount = Integer.parseInt(args[3]);
+        int fetchCount = -1;
+        if (args.length > 5) {
+            fetchCount = Integer.parseInt(args[4]);
+        }
+        final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
 
-    /**
-     * init topic->set(tid) map
-     * @param topics - topics string
-     * @return - map of topic->set(tid)
-     */
-    private static Map<String, TreeSet<String>> initTopicList(String topics) {
-        Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
         String[] topicTidsList = topics.split(",");
         for (String topicTids : topicTidsList) {
             String[] topicTidStr = topicTids.split(":");
@@ -115,60 +95,35 @@ public final class MessageConsumerExample {
             }
             topicTidsMap.put(topicTidStr[0], tids);
         }
-        return topicTidsMap;
-    }
+        final int startFetchCount = fetchCount;
+        final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount);
+        for (int i = 0; i < consumerCount; i++) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        MessageConsumerExample messageConsumer = new MessageConsumerExample(
+                                masterHostAndPort,
+                                group,
+                                startFetchCount
+                        );
+                        messageConsumer.subscribe(topicTidsMap);
+                    } catch (Exception e) {
+                        logger.error("Create consumer failed!", e);
+                    }
+                }
+            });
+        }
+        final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
+        statisticThread.start();
 
-    public static void main(String[] args) {
-        Options options = null;
+        executorService.shutdown();
         try {
-            CommandLineParser parser = new DefaultParser();
-            options = initOptions();
-            CommandLine cl = parser.parse(options, args);
-            if (cl != null) {
-                final String masterHostAndPort = cl.getOptionValue("master-list");
-                final Map<String, TreeSet<String>> topicTidsMap = initTopicList(
-                        cl.getOptionValue("topic"));
-                final String group = cl.getOptionValue("group");
-                int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1"));
-                final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1"));
-                final boolean isFromBegin = cl.hasOption("from-begin");
-                ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
-                for (int i = 0; i < threadNum; i++) {
-                    executorService.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                MessageConsumerExample messageConsumer = new MessageConsumerExample(
-                                        masterHostAndPort,
-                                        group,
-                                        fetchCount,
-                                        isFromBegin
-                                );
-                                messageConsumer.subscribe(topicTidsMap);
-                            } catch (Exception e) {
-                                logger.error("Create consumer failed!", e);
-                            }
-                        }
-                    });
-                }
-                final Thread statisticThread = new Thread(msgRecvStats,
-                        "Received Statistic Thread");
-                statisticThread.start();
-
-                executorService.shutdown();
-                try {
-                    executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    logger.error("Thread Pool shutdown has been interrupted!");
-                }
-                msgRecvStats.stopStats();
-            }
-        } catch (Exception ex) {
-            logger.error(ex.getMessage(), ex.getMessage());
-            if (options != null) {
-                ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options);
-            }
+            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            logger.error("Thread Pool shutdown has been interrupted!");
         }
+        msgRecvStats.stopStats();
     }
 
     public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
index b2d9327..abb2e2a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
@@ -27,64 +27,76 @@ public enum CliArgDef {
 
     HELP("h", "help", "Print usage information."),
     VERSION("v", "version", "Display TubeMQ version."),
-    MASTERSERVER("master-servers", "master-servers",
+    MASTERSERVER(null, "master-servers",
             "String: format is master1_ip:port[,master2_ip:port]",
             "The master address(es) to connect to."),
-    MASTERURL("master-url", "master-url",
+    MASTERURL(null, "master-url",
             "String: format is http://master_ip:master_webport/",
             "Master Service URL to which to connect.(default: http://localhost:8080/)"),
-    BROKERURL("broker-url", "broker-url",
+    BROKERURL(null, "broker-url",
             "String: format is http://broker_ip:broker_webport/",
             "Broker Service URL to which to connect.(default: http://localhost:8081/)"),
-    MESSAGES("messages", "messages",
+    MESSAGES(null, "messages",
             "Long: count",
             "The number of messages to send or consume, If not set, production or consumption is continual."),
-    MSGDATASIZE("msg-data-size", "message-data-size",
-            "Int: message size",
+    MSGDATASIZE(null, "message-data-size",
+            "Int: message size,(0, 1024 * 1024)",
             "message's data size in bytes. Note that you must provide exactly"
                     + " one of --msg-data-size or --payload-file."),
-    PAYLOADFILE("payload-file", "payload-file",
+    PAYLOADFILE(null, "payload-file",
             "String: payload file path",
             "file to read the message payloads from. This works only for"
                     + " UTF-8 encoded text files. Payloads will be read from this"
                     + " file and a payload will be randomly selected when sending"
                     + " messages. Note that you must provide exactly one"
                     + " of --msg-data-size or --payload-file."),
-    PAYLOADDELIM("payload-delimiter", "payload-delimiter",
+    PAYLOADDELIM(null, "payload-delimiter",
             "String: payload data's delimiter",
             "provides delimiter to be used when --payload-file is provided."
                     + " Defaults to new line. Note that this parameter will be"
                     + " ignored if --payload-file is not provided. (default: \\n)"),
     PRDTOPIC("topic", "topicName",
-            "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]",
+            "String: topic, format is topic_1[,topic_2[:filterCond_2.1[\\;filterCond_2.2]]]",
             "The topic(s) to produce messages to."),
     CNSTOPIC("topic", "topicName",
-            "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]",
+            "String: topic, format is topic_1[[:filterCond_1.1[\\;filterCond_1.2]][,topic_2]]",
             "The topic(s) to consume on."),
-    RPCTIMEOUT("timeout", "timeout",
+    RPCTIMEOUT(null, "rpc-timeout",
             "Long: milliseconds",
             "The maximum duration between request and response in milliseconds. (default: 10000)"),
+    CONNREUSE(null, "conn-reuse",
+            "bool: true or false",
+            "Different clients reuse TCP connections. (default: true)"),
     GROUP("group", "groupName",
             "String: consumer group",
-            "The consumer group name of the consumer."),
-    CLIENTCOUNT("client-num", "client-num",
-            "Int: client count",
-            "Number of consumers to started."),
-    PULLMODEL("pull-model", "pull-model",
-            "Pull consumption model."),
-    PUSHMODEL("push-model", "push-model",
-            "Push consumption model."),
-    FETCHTHREADS("num-fetch-threads", "num-fetch-threads",
-            "Integer: count",
+            "The consumer group name of the consumer. (default: test_consume)"),
+    CLIENTCOUNT(null, "client-count",
+            "Int: client count, [1, 100]",
+            "Number of producers or consumers to started."),
+    PUSHCONSUME(null, "consume-push",
+            "Push consumption action.(default: pull)"),
+    FETCHTHREADS(null, "num-fetch-threads",
+            "Integer: count, [1,100]",
             "Number of fetch threads, default: num of cpu count."),
-    FROMLATEST("from-latest", "from-latest",
-            "Start to consume from the latest message present in the log."),
-    FROMBEGINNING("from-beginning", "from-beginning",
-            "If the consumer does not already have an established offset to consume from,"
-                    + " start with the earliest message present in the log rather than the latest message."),
-    OUTPUTINTERVAL("output-interval", "output-interval",
-            "Integer: interval_ms",
-            "Interval in milliseconds at which to print progress info. (default: 5000)");
+    SENDTHREADS(null, "num-send-threads",
+            "Integer: count, [1,200]",
+            "Number of send message threads, default: num of cpu count."),
+    CONSUMEPOS(null, "consume-position",
+            "Integer: [-1,0, 1]",
+            "Set the start position of the consumer group. The value can be [-1, 0, 1]."
+                    + " Default value is 0. -1: Start from 0 for the first time."
+                    + " Otherwise start from last consume position."
+                    + " 0: Start from the latest position for the first time."
+                    + " Otherwise start from last consume position."
+                    + " 1: Start from the latest consume position."),
+    OUTPUTINTERVAL(null, "output-interval",
+            "Integer: interval_ms, [5000, +)",
+            "Interval in milliseconds at which to print progress info. (default: 5000)"),
+    SYNCPRODUCE(null, "sync-produce",
+            "Synchronous production. (default: false)"),
+    WITHOUTDELAY(null, "without-delay",
+                        "Production without delay. (default: false)");
+
 
 
     CliArgDef(String opt, String longOpt, String optDesc) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java
new file mode 100644
index 0000000..c0903f2
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.tubemq.server.common.TubeServerVersion;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+
+
+public abstract class CliAbstractBase {
+
+    protected final String commandName;
+    protected Options options = new Options();
+    protected CommandLineParser parser = new DefaultParser();
+    private HelpFormatter formatter = new HelpFormatter();
+
+    public CliAbstractBase(String commandName) {
+        this.commandName = commandName;
+        addCommandOption(CliArgDef.HELP);
+        addCommandOption(CliArgDef.VERSION);
+        formatter.setWidth(500);
+    }
+
+   /**
+     * Print help information and exit.
+     *
+     */
+    public void help() {
+        formatter.printHelp(commandName, options);
+        System.exit(0);
+    }
+
+    /**
+     * Print tubemq server version.
+     *
+     */
+    public void version() {
+        System.out.println("TubeMQ " + TubeServerVersion.BROKER_VERSION);
+        System.exit(0);
+    }
+
+    public void addCommandOption(CliArgDef cliArgDef) {
+        Option option = new Option(cliArgDef.opt,
+                cliArgDef.longOpt, cliArgDef.hasArg, cliArgDef.optDesc);
+        if (cliArgDef.hasArg) {
+            option.setArgName(cliArgDef.argDesc);
+        }
+        options.addOption(option);
+    }
+
+
+    protected abstract void initCommandOptions();
+
+
+    public abstract boolean parseParams(String[] args) throws Exception;
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
new file mode 100644
index 0000000..666e6f9
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.tubemq.client.common.PeerInfo;
+import org.apache.tubemq.client.config.ConsumerConfig;
+import org.apache.tubemq.client.consumer.ConsumePosition;
+import org.apache.tubemq.client.consumer.ConsumerResult;
+import org.apache.tubemq.client.consumer.MessageConsumer;
+import org.apache.tubemq.client.consumer.MessageV2Listener;
+import org.apache.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.tubemq.client.exception.TubeClientException;
+import org.apache.tubemq.client.factory.MessageSessionFactory;
+import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * This class is use to process CLI Consumer process.
+ *
+ *
+ */
+public class CliConsumer extends CliAbstractBase {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(CliConsumer.class);
+    // statistic data index
+    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+    // sent data content
+    private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+    private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
+    private final Map<MessageConsumer, TupleValue> consumerMap = new HashMap<>();
+    // cli parameters
+    private String masterServers;
+    private String groupName = "test_consume";
+    private ConsumePosition consumePos =
+            ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
+    private long msgCount = TBaseConstants.META_VALUE_UNDEFINED;
+    private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean reuseConn = false;
+    private int clientCount = 1;
+    private int fetchThreadCnt =
+            Runtime.getRuntime().availableProcessors();
+    private long printIntervalMs = 5000;
+    private boolean isPushConsume = false;
+
+    private boolean isStarted = false;
+
+
+    public CliConsumer() {
+        super("tubemq-consumer-test.sh");
+        initCommandOptions();
+    }
+
+    /**
+     * Init command options
+     */
+    protected void initCommandOptions() {
+        // add the cli required parameters
+        addCommandOption(CliArgDef.MASTERSERVER);
+        addCommandOption(CliArgDef.MESSAGES);
+        addCommandOption(CliArgDef.CNSTOPIC);
+        addCommandOption(CliArgDef.RPCTIMEOUT);
+        addCommandOption(CliArgDef.GROUP);
+        addCommandOption(CliArgDef.CONNREUSE);
+        addCommandOption(CliArgDef.PUSHCONSUME);
+        addCommandOption(CliArgDef.CONSUMEPOS);
+        addCommandOption(CliArgDef.FETCHTHREADS);
+        addCommandOption(CliArgDef.CLIENTCOUNT);
+        addCommandOption(CliArgDef.OUTPUTINTERVAL);
+        addCommandOption(CliArgDef.WITHOUTDELAY);
+    }
+
+    public boolean parseParams(String[] args) throws Exception {
+        // parse parameters and check value
+        CommandLine cli = parser.parse(options, args);
+        if (cli == null) {
+            throw new ParseException("Parse args failure");
+        }
+        if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+            version();
+        }
+        if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+            help();
+        }
+        masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
+        if (TStringUtils.isBlank(masterServers)) {
+            throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
+        }
+        String topicStr = cli.getOptionValue(CliArgDef.CNSTOPIC.longOpt);
+        if (TStringUtils.isBlank(topicStr)) {
+            throw new Exception(CliArgDef.CNSTOPIC.longOpt + " is required!");
+        }
+        topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr));
+        if (topicAndFiltersMap.isEmpty()) {
+            throw new Exception("Invalid " + CliArgDef.CNSTOPIC.longOpt + " parameter value!");
+        }
+        String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt);
+        if (TStringUtils.isNotBlank(msgCntStr)) {
+            msgCount = Long.parseLong(msgCntStr);
+        }
+        String groupNameStr = cli.getOptionValue(CliArgDef.GROUP.longOpt);
+        if (TStringUtils.isNotBlank(groupNameStr)) {
+            groupName = cli.getOptionValue(CliArgDef.GROUP.longOpt);
+        }
+        String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt);
+        if (TStringUtils.isNotBlank(reuseConnStr)) {
+            reuseConn = Boolean.parseBoolean(reuseConnStr);
+        }
+        String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
+        if (TStringUtils.isNotBlank(rpcTimeoutStr)) {
+            rpcTimeoutMs = Long.parseLong(rpcTimeoutStr);
+        }
+        String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
+        if (TStringUtils.isNotBlank(clientCntStr)) {
+            clientCount = Integer.parseInt(clientCntStr);
+        }
+        String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
+        if (TStringUtils.isNotBlank(printIntMsStr)) {
+            printIntervalMs = Long.parseLong(printIntMsStr);
+            if (printIntervalMs < 5000) {
+                throw new Exception("Invalid "
+                        + CliArgDef.OUTPUTINTERVAL.longOpt
+                        + " parameter value!");
+            }
+        }
+        String consumePosStr = cli.getOptionValue(CliArgDef.CONSUMEPOS.longOpt);
+        if (TStringUtils.isNotBlank(consumePosStr)) {
+            int tmpPosId = Integer.parseInt(consumePosStr);
+            if (tmpPosId > 0) {
+                consumePos = ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS;
+            } else if (tmpPosId < 0) {
+                consumePos = ConsumePosition.CONSUMER_FROM_FIRST_OFFSET;
+            } else {
+                consumePos = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
+            }
+        }
+        if (cli.hasOption(CliArgDef.PUSHCONSUME.longOpt)) {
+            isPushConsume = true;
+        }
+        String fetchThreadCntStr = cli.getOptionValue(CliArgDef.FETCHTHREADS.longOpt);
+        if (TStringUtils.isNotBlank(fetchThreadCntStr)) {
+            int tmpFetchThreadCnt = Integer.parseInt(fetchThreadCntStr);
+            tmpFetchThreadCnt = (tmpFetchThreadCnt < 1) ? 1 : Math.min(tmpFetchThreadCnt, 100);
+            fetchThreadCnt = tmpFetchThreadCnt;
+        }
+        return true;
+    }
+
+    // initial tubemq client order by caller required
+    public void initTask() throws Exception {
+        // initial consumer configure
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterServers, groupName);
+        consumerConfig.setRpcTimeoutMs(rpcTimeoutMs);
+        consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
+        consumerConfig.setConsumePosition(consumePos);
+        // initial consumer object
+        if (isPushConsume) {
+            DefaultMessageListener msgListener =
+                    new DefaultMessageListener();
+            if (reuseConn) {
+                // if reuse connection, need use TubeSingleSessionFactory class
+                MessageSessionFactory msgSessionFactory =
+                        new TubeSingleSessionFactory(consumerConfig);
+                this.sessionFactoryList.add(msgSessionFactory);
+                for (int i = 0; i < clientCount; i++) {
+                    PushMessageConsumer consumer1 =
+                            msgSessionFactory.createPushConsumer(consumerConfig);
+                    for (Map.Entry<String, TreeSet<String>> entry
+                            : topicAndFiltersMap.entrySet()) {
+                        consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener);
+                    }
+                    consumer1.completeSubscribe();
+                    consumerMap.put(consumer1, null);
+                }
+            } else {
+                for (int i = 0; i < clientCount; i++) {
+                    MessageSessionFactory msgSessionFactory =
+                            new TubeMultiSessionFactory(consumerConfig);
+                    this.sessionFactoryList.add(msgSessionFactory);
+                    PushMessageConsumer consumer1 =
+                            msgSessionFactory.createPushConsumer(consumerConfig);
+                    for (Map.Entry<String, TreeSet<String>> entry
+                            : topicAndFiltersMap.entrySet()) {
+                        consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener);
+                    }
+                    consumer1.completeSubscribe();
+                    consumerMap.put(consumer1, null);
+                }
+            }
+        } else {
+            if (reuseConn) {
+                MessageSessionFactory msgSessionFactory =
+                        new TubeSingleSessionFactory(consumerConfig);
+                this.sessionFactoryList.add(msgSessionFactory);
+                for (int i = 0; i < clientCount; i++) {
+                    PullMessageConsumer consumer2 =
+                            msgSessionFactory.createPullConsumer(consumerConfig);
+                    for (Map.Entry<String, TreeSet<String>> entry
+                            : topicAndFiltersMap.entrySet()) {
+                        consumer2.subscribe(entry.getKey(), entry.getValue());
+                    }
+                    consumer2.completeSubscribe();
+                    consumerMap.put(consumer2,
+                            new TupleValue(consumer2, msgCount, fetchThreadCnt));
+                }
+            } else {
+                for (int i = 0; i < clientCount; i++) {
+                    MessageSessionFactory msgSessionFactory =
+                            new TubeMultiSessionFactory(consumerConfig);
+                    this.sessionFactoryList.add(msgSessionFactory);
+                    PullMessageConsumer consumer2 =
+                            msgSessionFactory.createPullConsumer(consumerConfig);
+                    for (Map.Entry<String, TreeSet<String>> entry
+                            : topicAndFiltersMap.entrySet()) {
+                        consumer2.subscribe(entry.getKey(), entry.getValue());
+                    }
+                    consumer2.completeSubscribe();
+                    consumerMap.put(consumer2,
+                            new TupleValue(consumer2, msgCount, fetchThreadCnt));
+                }
+            }
+        }
+        isStarted = true;
+    }
+
+    public void shutdown() throws Throwable {
+        // stop process
+        ThreadUtils.sleep(20);
+        for (MessageConsumer consumer : consumerMap.keySet()) {
+            consumer.shutdown();
+        }
+        for (MessageSessionFactory messageSessionFactory : sessionFactoryList) {
+            messageSessionFactory.shutdown();
+        }
+    }
+
+
+    private static class TupleValue {
+        public Thread[] fetchRunners = null;
+
+        public TupleValue(PullMessageConsumer consumer, long msgCount, int fetchThreadCnt) {
+            fetchRunners = new Thread[fetchThreadCnt];
+            for (int i = 0; i < fetchRunners.length; i++) {
+                fetchRunners[i] = new Thread(new FetchRequestRunner(consumer, msgCount));
+                fetchRunners[i].setName("_fetch_runner_" + i);
+            }
+            for (Thread thread : fetchRunners) {
+                thread.start();
+            }
+        }
+
+    }
+
+
+    // for push consumer callback process
+    private static class DefaultMessageListener implements MessageV2Listener {
+
+        public DefaultMessageListener() {
+        }
+
+        @Override
+        public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
+            if (messages != null && !messages.isEmpty()) {
+                TOTAL_COUNTER.addAndGet(messages.size());
+            }
+        }
+
+        @Override
+        public void receiveMessages(List<Message> messages) {
+            // deprecated
+        }
+
+        @Override
+        public Executor getExecutor() {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+
+    // for push consumer process
+    private static class FetchRequestRunner implements Runnable {
+
+        private final PullMessageConsumer messageConsumer;
+        private final long msgConsumeCnt;
+
+        FetchRequestRunner(PullMessageConsumer messageConsumer, long msgConsumeCnt) {
+            this.messageConsumer = messageConsumer;
+            this.msgConsumeCnt = msgConsumeCnt;
+        }
+
+        @Override
+        public void run() {
+            try {
+                do {
+                    ConsumerResult result = messageConsumer.getMessage();
+                    if (result.isSuccess()) {
+                        List<Message> messageList = result.getMessageList();
+                        if (messageList != null && !messageList.isEmpty()) {
+                            TOTAL_COUNTER.addAndGet(messageList.size());
+                        }
+                        messageConsumer.confirmConsume(result.getConfirmContext(), true);
+                    } else {
+                        if (!TErrCodeConstants.IGNORE_ERROR_SET.contains(result.getErrCode())) {
+                            logger.info(
+                                    "Receive messages errorCode is {}, Error message is {}",
+                                    result.getErrCode(),
+                                    result.getErrMsg());
+                            if (messageConsumer.isShutdown()) {
+                                break;
+                            }
+                        }
+                    }
+                    if (msgConsumeCnt >= 0) {
+                        if (TOTAL_COUNTER.get() >= msgConsumeCnt) {
+                            break;
+                        }
+                    }
+                } while (true);
+            } catch (TubeClientException e) {
+                logger.error("Create consumer failed!", e);
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        CliConsumer cliConsumer = new CliConsumer();
+        try {
+            boolean result = cliConsumer.parseParams(args);
+            if (!result) {
+                throw new Exception("Parse parameters failure!");
+            }
+            cliConsumer.initTask();
+            ThreadUtils.sleep(1000);
+            while (cliConsumer.msgCount < 0
+                    || TOTAL_COUNTER.get() < cliConsumer.msgCount * cliConsumer.clientCount) {
+                ThreadUtils.sleep(cliConsumer.printIntervalMs);
+                System.out.println("Required received count VS received message count = "
+                        + (cliConsumer.msgCount * cliConsumer.clientCount)
+                        + " : " + TOTAL_COUNTER.get());
+            }
+            cliConsumer.shutdown();
+            System.out.println("Finished, received count VS received message count = "
+                    + (cliConsumer.msgCount * cliConsumer.clientCount)
+                    + " : " + TOTAL_COUNTER.get());
+        } catch (Throwable ex) {
+            ex.printStackTrace();
+            logger.error(ex.getMessage());
+            cliConsumer.help();
+        }
+
+    }
+
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
new file mode 100644
index 0000000..2734ac1
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.tubemq.client.config.TubeClientConfig;
+import org.apache.tubemq.client.factory.MessageSessionFactory;
+import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.tubemq.client.producer.MessageProducer;
+import org.apache.tubemq.client.producer.MessageSentCallback;
+import org.apache.tubemq.client.producer.MessageSentResult;
+import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is use to process CLI Producer process.
+ *
+ *
+ */
+public class CliProducer extends CliAbstractBase {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(CliProducer.class);
+    // statistic data index
+    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+    private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
+    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
+    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
+    // sent data content
+    private static byte[] sentData;
+    private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+    private final List<TupleValue> topicSendRounds = new ArrayList<>();
+    private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
+    private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
+    // cli parameters
+    private String masterServers;
+    private long msgCount = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean useRandData = true;
+    private int msgDataSize = 1000;
+    private String payloadFilePath = null;
+    private String payloadDelim = null;
+    private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED;
+    private boolean reuseConn = false;
+    private int clientCount = 1;
+    private int sendThreadCnt = 100;
+    private long printIntervalMs = 5000;
+    private boolean syncProduction = false;
+    private boolean withoutDelay = false;
+    private boolean isStarted = false;
+    private ExecutorService sendExecutorService = null;
+
+
+    public CliProducer() {
+        super("tubemq-producer-test.sh");
+        initCommandOptions();
+    }
+
+    /**
+     * Init command options
+     */
+    protected void initCommandOptions() {
+        // add the cli required parameters
+        addCommandOption(CliArgDef.MASTERSERVER);
+        addCommandOption(CliArgDef.MESSAGES);
+        addCommandOption(CliArgDef.MSGDATASIZE);
+        //addCommandOption(CliArgDef.PAYLOADFILE);
+        //addCommandOption(CliArgDef.PAYLOADDELIM);
+        addCommandOption(CliArgDef.PRDTOPIC);
+        addCommandOption(CliArgDef.RPCTIMEOUT);
+        addCommandOption(CliArgDef.CONNREUSE);
+        addCommandOption(CliArgDef.CLIENTCOUNT);
+        addCommandOption(CliArgDef.OUTPUTINTERVAL);
+        addCommandOption(CliArgDef.SYNCPRODUCE);
+        addCommandOption(CliArgDef.SENDTHREADS);
+        addCommandOption(CliArgDef.WITHOUTDELAY);
+    }
+
+    public boolean parseParams(String[] args) throws Exception {
+        // parse parameters and check value
+        CommandLine cli = parser.parse(options, args);
+        if (cli == null) {
+            throw new ParseException("Parse args failure");
+        }
+        if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+            version();
+        }
+        if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+            help();
+        }
+        masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
+        if (TStringUtils.isBlank(masterServers)) {
+            throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
+        }
+        String topicStr = cli.getOptionValue(CliArgDef.PRDTOPIC.longOpt);
+        if (TStringUtils.isBlank(topicStr)) {
+            throw new Exception(CliArgDef.PRDTOPIC.longOpt + " is required!");
+        }
+        topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr));
+        if (topicAndFiltersMap.isEmpty()) {
+            throw new Exception("Invalid " + CliArgDef.PRDTOPIC.longOpt + " parameter value!");
+        }
+        String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt);
+        if (TStringUtils.isNotBlank(msgCntStr)) {
+            msgCount = Long.parseLong(msgCntStr);
+        }
+        String msgDataSizeStr = cli.getOptionValue(CliArgDef.MSGDATASIZE.longOpt);
+        if (TStringUtils.isNotBlank(msgDataSizeStr)) {
+            msgDataSize = Integer.parseInt(msgDataSizeStr);
+        }
+        String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt);
+        if (TStringUtils.isNotBlank(reuseConnStr)) {
+            reuseConn = Boolean.parseBoolean(reuseConnStr);
+        }
+        String sendThreadCntStr = cli.getOptionValue(CliArgDef.SENDTHREADS.longOpt);
+        if (TStringUtils.isNotBlank(sendThreadCntStr)) {
+            int tmpThreadCnt = Integer.parseInt(sendThreadCntStr);
+            tmpThreadCnt = (tmpThreadCnt < 1) ? 1 : Math.min(tmpThreadCnt, 200);
+            sendThreadCnt = tmpThreadCnt;
+        }
+        String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
+        if (TStringUtils.isNotBlank(rpcTimeoutStr)) {
+            rpcTimeoutMs = Long.parseLong(rpcTimeoutStr);
+        }
+        String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
+        if (TStringUtils.isNotBlank(clientCntStr)) {
+            clientCount = Integer.parseInt(clientCntStr);
+        }
+        String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
+        if (TStringUtils.isNotBlank(printIntMsStr)) {
+            printIntervalMs = Long.parseLong(printIntMsStr);
+            if (printIntervalMs < 5000) {
+                throw new Exception("Invalid "
+                        + CliArgDef.OUTPUTINTERVAL.longOpt
+                        + " parameter value!");
+            }
+        }
+        if (cli.hasOption(CliArgDef.SYNCPRODUCE.longOpt)) {
+            syncProduction = true;
+        }
+        if (cli.hasOption(CliArgDef.WITHOUTDELAY.longOpt)) {
+            withoutDelay = true;
+        }
+        return true;
+    }
+    // initial tubemq client order by caller required
+    public void initTask() throws Exception {
+        // initial sent data
+        sentData = MixedUtils.buildTestData(msgDataSize);
+        // initial client configure
+        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+        clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+        // initial topic send round
+        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                topicSendRounds.add(new TupleValue(entry.getKey()));
+            } else {
+                for (String filter : entry.getValue()) {
+                    topicSendRounds.add(new TupleValue(entry.getKey(), filter));
+                }
+            }
+        }
+        // initial send thread service
+        sendExecutorService =
+                Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable runnable) {
+                        return new Thread(runnable, "sender_" + producerMap.size());
+                    }
+                });
+        // initial producer object
+        if (reuseConn) {
+            // if resue connection, use TubeSingleSessionFactory class
+            MessageSessionFactory msgSessionFactory =
+                    new TubeSingleSessionFactory(clientConfig);
+            this.sessionFactoryList.add(msgSessionFactory);
+            for (int i = 0; i < clientCount; i++) {
+                MessageProducer producer = msgSessionFactory.createProducer();
+                producer.publish(topicAndFiltersMap.keySet());
+                producerMap.put(producer, new MsgSender(producer));
+                // send send task
+                sendExecutorService.submit(producerMap.get(producer));
+            }
+        } else {
+            for (int i = 0; i < clientCount; i++) {
+                // if not resue connection, use TubeMultiSessionFactory class
+                MessageSessionFactory msgSessionFactory =
+                        new TubeMultiSessionFactory(clientConfig);
+                this.sessionFactoryList.add(msgSessionFactory);
+                MessageProducer producer = msgSessionFactory.createProducer();
+                producer.publish(topicAndFiltersMap.keySet());
+                producerMap.put(producer, new MsgSender(producer));
+                // send send task
+                sendExecutorService.submit(producerMap.get(producer));
+            }
+        }
+        isStarted = true;
+    }
+
+    public void shutdown() throws Throwable {
+        // stop process
+        if (sendExecutorService != null) {
+            sendExecutorService.shutdownNow();
+        }
+        ThreadUtils.sleep(20);
+        for (MessageProducer producer : producerMap.keySet()) {
+            producer.shutdown();
+        }
+        for (MessageSessionFactory messageSessionFactory : sessionFactoryList) {
+            messageSessionFactory.shutdown();
+        }
+    }
+
+    // process message send
+    public class MsgSender implements Runnable {
+
+        private final MessageProducer producer;
+
+        public MsgSender(MessageProducer producer) {
+            this.producer = producer;
+        }
+
+        @Override
+        public void run() {
+            int topicAndCondCnt = topicSendRounds.size();
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+            long sentCount = 0;
+            int roundIndex = 0;
+            while (msgCount < 0 || sentCount < msgCount) {
+                roundIndex = (int) (sentCount++ % topicAndCondCnt);
+                try {
+                    long millis = System.currentTimeMillis();
+                    TupleValue tupleValue = topicSendRounds.get(roundIndex);
+                    Message message = new Message(tupleValue.topic, sentData);
+                    if (tupleValue.filter != null) {
+                        // if include filter, add filter item
+                        message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis)));
+                    }
+                    // use sync or async process
+                    if (syncProduction) {
+                        MessageSentResult procResult =
+                                producer.sendMessage(message);
+                        TOTAL_COUNTER.incrementAndGet();
+                        if (procResult.isSuccess()) {
+                            SENT_SUCC_COUNTER.incrementAndGet();
+                        } else {
+                            SENT_FAIL_COUNTER.incrementAndGet();
+                        }
+                    } else {
+                        producer.sendMessage(message, new DefaultSendCallback());
+                    }
+                } catch (Throwable e1) {
+                    TOTAL_COUNTER.incrementAndGet();
+                    SENT_EXCEPT_COUNTER.incrementAndGet();
+                    logger.error("sendMessage exception: ", e1);
+                }
+                // Limit sending flow control to avoid frequent errors
+                // caused by too many inflight messages being sent
+                if (!withoutDelay) {
+                    if (sentCount % 5000 == 0) {
+                        ThreadUtils.sleep(3000);
+                    } else if (sentCount % 4000 == 0) {
+                        ThreadUtils.sleep(2000);
+                    } else if (sentCount % 2000 == 0) {
+                        ThreadUtils.sleep(800);
+                    } else if (sentCount % 1000 == 0) {
+                        ThreadUtils.sleep(400);
+                    }
+                }
+            }
+            // finished, close client
+            try {
+                producer.shutdown();
+            } catch (Throwable e) {
+                logger.error("producer shutdown error: ", e);
+            }
+        }
+    }
+
+    private class DefaultSendCallback implements MessageSentCallback {
+        @Override
+        public void onMessageSent(MessageSentResult result) {
+            TOTAL_COUNTER.incrementAndGet();
+            if (result.isSuccess()) {
+                SENT_SUCC_COUNTER.incrementAndGet();
+            } else {
+                SENT_FAIL_COUNTER.incrementAndGet();
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            TOTAL_COUNTER.incrementAndGet();
+            SENT_EXCEPT_COUNTER.incrementAndGet();
+            logger.error("Send message error!", e);
+        }
+    }
+
+    private static class TupleValue {
+        public String topic = null;
+        public String filter = null;
+
+        public TupleValue(String topic) {
+            this.topic = topic;
+        }
+
+        public TupleValue(String topic, String filter) {
+            this.topic = topic;
+            this.filter = filter;
+        }
+
+    }
+
+    public static void main(String[] args) {
+        CliProducer cliProducer = new CliProducer();
+        try {
+            boolean result = cliProducer.parseParams(args);
+            if (!result) {
+                throw new Exception("Parse parameters failure!");
+            }
+            cliProducer.initTask();
+            ThreadUtils.sleep(1000);
+            while (cliProducer.msgCount < 0
+                    || TOTAL_COUNTER.get() < cliProducer.msgCount * cliProducer.clientCount) {
+                ThreadUtils.sleep(cliProducer.printIntervalMs);
+                System.out.println("Required send count VS sent message count = "
+                        + (cliProducer.msgCount * cliProducer.clientCount)
+                        + " : " + TOTAL_COUNTER.get()
+                        + " (" + SENT_SUCC_COUNTER.get()
+                        + ":" + SENT_FAIL_COUNTER.get()
+                        + ":" + SENT_EXCEPT_COUNTER.get()
+                        + ")");
+            }
+            cliProducer.shutdown();
+            System.out.println("Finished, required send count VS sent message count = "
+                    + (cliProducer.msgCount * cliProducer.clientCount)
+                    + " : " + TOTAL_COUNTER.get()
+                    + " (" + SENT_SUCC_COUNTER.get()
+                    + ":" + SENT_FAIL_COUNTER.get()
+                    + ":" + SENT_EXCEPT_COUNTER.get()
+                    + ")");
+        } catch (Throwable ex) {
+            ex.printStackTrace();
+            logger.error(ex.getMessage());
+            cliProducer.help();
+        }
+
+    }
+
+
+}