You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 10:29:43 UTC

[incubator-inlong] branch master updated: [INLONG-1739][Improve] Optimization of TubeMQ SDK usage demo (#1767)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 209bc63  [INLONG-1739][Improve] Optimization of TubeMQ SDK usage demo (#1767)
209bc63 is described below

commit 209bc63bab8399a986f259486be5cb36c888e77d
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 18:29:35 2021 +0800

    [INLONG-1739][Improve] Optimization of TubeMQ SDK usage demo (#1767)
---
 .../tubemq/client/config/TubeClientConfig.java     |   2 +-
 .../tubemq/client/consumer/MessageConsumer.java    |  32 ++
 .../inlong/tubemq/corebase/cluster/MasterInfo.java |   8 +-
 .../inlong/tubemq/corebase/utils/MixedUtils.java   |  55 ++++
 .../tubemq/example/MAMessageProducerExample.java   | 283 ++++++++---------
 .../tubemq/example/MessageConsumerExample.java     | 148 ---------
 .../tubemq/example/MessageProducerExample.java     | 203 +++++--------
 .../tubemq/example/MessagePullConsumerExample.java | 221 ++++++++------
 .../example/MessagePullSetConsumerExample.java     | 335 +++++++++++++--------
 .../tubemq/example/MessagePushConsumerExample.java | 153 ++++++++++
 ...{MsgRecvStats.java => MsgSendReceiveStats.java} |  35 ++-
 .../tubemq/server/tools/cli/CliProducer.java       |  35 +--
 12 files changed, 814 insertions(+), 696 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index 83285b2..c567c37 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.inlong.tubemq.corerpc.RpcConstants;
  */
 public class TubeClientConfig {
     // Master information.
-    private MasterInfo masterInfo;
+    private final MasterInfo masterInfo;
     // Rpc read time out.
     private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
     // Rpc connection processor number.
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
index 2f3a1ce..df98490 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
@@ -35,6 +35,11 @@ public interface MessageConsumer extends Shutdownable {
 
     boolean isFilterConsume(String topic);
 
+    /**
+     * Get consume offset information of the current registered partitions
+     *
+     * @return  consume offset information
+     */
     Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException;
 
     /**
@@ -73,8 +78,35 @@ public interface MessageConsumer extends Shutdownable {
      */
     Map<String, Long> getFrozenPartInfo();
 
+    /**
+     * Start consume messages with default setting
+     */
     void completeSubscribe() throws TubeClientException;
 
+    /**
+     * Start consumption with the precise Offset settings
+     *
+     * The parameter sessionKey is specified by the caller, similar to the JobID in Flink,
+     * which is used to identify the unrelated offset reset consumption activities before and after.
+     * Each reset operation needs to ensure that it is different from the last reset carried sessionKey;
+     *
+     * The parameter sourceCount is used to inform the server how many consumers will consume
+     * in this round of consumer group activation, and the client will not consume data until
+     * the consumer group has not reached the specified number of consumers.
+     *
+     * The parameter isSelectBig is used to inform the server that if multiple clients reset
+     * the offset to the same partition, the server will use the largest offset
+     * or the smallest offset as the standard;
+     *
+     * The parameter partOffsetMap is used to inform the server that this consumption expects
+     * the partitions in the Map to be consumed according to the specified offset value.
+     * The offset in the Map comes from the consumer's query from the server, or the content
+     * returned when the consumer successfully consumes the data before, including push
+     * the PearInfo object returned by the callback function during consumption and
+     * the PearInfo object in the ConsumerResult class during Pull consumption.
+     * The Key in the Map is the partitionKey carried in PearInfo, and the value is
+     * the currOffset value carried in PearInfo.
+     */
     void completeSubscribe(String sessionKey,
                            int sourceCount,
                            boolean isSelectBig,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
index 2de42ba..680bb05 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
@@ -29,9 +29,9 @@ public class MasterInfo {
 
     private final Map<String/* ip:port */, NodeAddrInfo> addrMap4Failover =
             new HashMap<>();
-    private List<String> nodeHostPortList;
+    private final List<String> nodeHostPortList;
     private NodeAddrInfo firstNodeAddr = null;
-    private String masterClusterStr;
+    private final String masterClusterStr;
 
     /**
      * masterAddrInfo: "ip1:port,ip2:port"
@@ -70,9 +70,7 @@ public class MasterInfo {
             }
             int port = Integer.parseInt(hostPortItem[1].trim());
             NodeAddrInfo tmpNodeAddrInfo = new NodeAddrInfo(hostName, port);
-            if (addrMap4Failover.get(tmpNodeAddrInfo.getHostPortStr()) == null) {
-                addrMap4Failover.put(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
-            }
+            addrMap4Failover.putIfAbsent(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
             if (this.firstNodeAddr == null) {
                 this.firstNodeAddr = tmpNodeAddrInfo;
             }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index eb432d5..e4629b2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,11 +18,16 @@
 package org.apache.inlong.tubemq.corebase.utils;
 
 import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
 
 public class MixedUtils {
@@ -78,6 +83,25 @@ public class MixedUtils {
         return topicAndFiltersMap;
     }
 
+    // build the topic and filter item pair carried in the message
+    public static List<Tuple2<String, String>> buildTopicFilterTupleList(
+            Map<String, TreeSet<String>> topicAndFiltersMap) {
+        // initial send target
+        List<Tuple2<String, String>> topicFilterTuples = new ArrayList<>();
+        // initial topic send round
+        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                topicFilterTuples.add(new Tuple2<>(entry.getKey()));
+            } else {
+                for (String filter : entry.getValue()) {
+                    topicFilterTuples.add(new Tuple2<>(entry.getKey(), filter));
+                }
+            }
+        }
+        return topicFilterTuples;
+    }
+
+    // only for demo
     public static byte[] buildTestData(int bodySize) {
         final byte[] transmitData =
                 StringUtils.getBytesUtf8("This is a test data!");
@@ -91,6 +115,37 @@ public class MixedUtils {
         return dataBuffer.array();
     }
 
+    // build message to be sent
+    // only for demo
+    public static Message buildMessage(String topicName, String filterItem,
+                                       byte[] bodyData, long serialId,
+                                       SimpleDateFormat sdf) {
+        // build message to be sent
+        Message message = new Message(topicName, bodyData);
+        long currTimeMillis = System.currentTimeMillis();
+        // added a serial number and data generation time to each message
+        message.setAttrKeyVal("serialId", String.valueOf(serialId));
+        message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+        if (filterItem != null) {
+            // add filter attribute information
+            message.putSystemHeader(filterItem, sdf.format(new Date(currTimeMillis)));
+        }
+        return message;
+    }
+
+    // only for demo
+    public static void coolSending(long msgSentCount) {
+        if (msgSentCount % 5000 == 0) {
+            ThreadUtils.sleep(3000);
+        } else if (msgSentCount % 4000 == 0) {
+            ThreadUtils.sleep(2000);
+        } else if (msgSentCount % 2000 == 0) {
+            ThreadUtils.sleep(800);
+        } else if (msgSentCount % 1000 == 0) {
+            ThreadUtils.sleep(400);
+        }
+    }
+
     // get the middle data between min, max, and data
     public static int mid(int data, int min, int max) {
         return Math.max(min, Math.min(max, data));
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
index 4a7ab2f..574caaa 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
@@ -17,20 +17,17 @@
 
 package org.apache.inlong.tubemq.example;
 
-import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -38,9 +35,7 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,194 +48,180 @@ import org.slf4j.LoggerFactory;
 public class MAMessageProducerExample {
     private static final Logger logger =
             LoggerFactory.getLogger(MAMessageProducerExample.class);
-    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
-
-    private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
-    private static final int MAX_PRODUCER_NUM = 100;
-    private static final int SESSION_FACTORY_NUM = 10;
-
-    private static Map<String, TreeSet<String>> topicAndFiltersMap;
-    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
-    private static int msgCount;
-    private static int clientCount;
-    private static byte[] sendData;
-    private static AtomicLong filterMsgCount = new AtomicLong(0);
-
-    private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
-    private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
-    private final ExecutorService sendExecutorService =
-            Executors.newFixedThreadPool(MAX_PRODUCER_NUM, new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable runnable) {
-                    return new Thread(runnable, "sender_" + producerMap.size());
-                }
-            });
-    private final AtomicInteger producerIndex = new AtomicInteger(0);
-
-    public MAMessageProducerExample(String masterHostAndPort) throws Exception {
-        TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
-        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
-            this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
-        }
-    }
-
-    public static void main(String[] args) {
-        // get call parameters
+    private static final MsgSendReceiveStats msgSendStats =
+            new MsgSendReceiveStats(true);
+    private static final Map<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>>
+            sessionFactoryProducerMap = new HashMap<>();
+    private static final AtomicLong totalSentCnt = new AtomicLong(0);
+    private static ExecutorService sendExecutorService;
+
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to publish to.
+    // The 3rd parameter msgCount is the message amount that needs to be sent
+    // The 4th parameter pkgSize is the message's body size that needs to be sent
+    // The 5th parameter clientCount is the amount of producer
+    // The 6th parameter sessionFactoryCnt is the amount of session factory
+    public static void main(String[] args) throws Throwable {
+        // 1. get call parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        msgCount = Integer.parseInt(args[2]);
-        clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-        topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
-        // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
+        final String pubTopicAndFilterItems = args[1];
+        final long msgCount = Long.parseLong(args[2]);
+        int pkgSize = 1024;
+        if (args.length > 3) {
+            pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
+        }
+        int clientCnt = 2;
+        if (args.length > 4) {
+            clientCnt = MixedUtils.mid(Integer.parseInt(args[4]), 1, 100);
         }
-        // build message's body content
-        final byte[] transmitData =
-                StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
-        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-        while (dataBuffer.hasRemaining()) {
-            int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(transmitData, offset,
-                    Math.min(dataBuffer.remaining(), transmitData.length));
+        int sessionFactoryCnt = 10;
+        if (args.length > 5) {
+            sessionFactoryCnt = MixedUtils.mid(Integer.parseInt(args[5]), 1, 20);
+        }
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+        // 2. build multi-session factory
+        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+        for (int i = 0; i < sessionFactoryCnt; i++) {
+            sessionFactoryProducerMap.put(i,
+                    new Tuple2<>(new TubeMultiSessionFactory(clientConfig), new HashSet<>()));
+        }
+
+        // 3. build multi-thread message sender
+        sendExecutorService =
+                Executors.newFixedThreadPool(clientCnt, new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable runnable) {
+                        return new Thread(runnable);
+                    }
+                });
+
+        // 4. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgSendStats, "Sent Statistic Thread");
+        statisticThread.start();
+
+        // 5. build the content of the message to be sent
+        //    include message body, attributes, and time information template
+        final byte[] bodyData =
+                MixedUtils.buildTestData(pkgSize);
+        List<Tuple2<String, String>> buildTopicFilterTuples =
+                MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+
+        // 6. Rotating build and start producers in the session factory list
+        //    In the same process, different TubeMultiSessionFactory objects can create
+        //    independent connections for the same Broker.
+        //   We increase the concurrent throughput of the system by increasing the
+        //   number of links. Here we distribute the clients evenly on
+        //   different TubeMultiSessionFactory objects to
+        //   improve data production performance in the single process
+        for (int indexId = 0; indexId < clientCnt; indexId++) {
+            Tuple2<MessageSessionFactory, Set<MessageProducer>> sessionProducerMap =
+                    sessionFactoryProducerMap.get(indexId % sessionFactoryCnt);
+            MessageProducer producer = sessionProducerMap.getF0().createProducer();
+            producer.publish(topicAndFiltersMap.keySet());
+            sessionProducerMap.getF1().add(producer);
+            sendExecutorService.submit(new Sender(indexId,
+                    producer, bodyData, buildTopicFilterTuples, msgCount));
         }
-        dataBuffer.flip();
-        sendData = dataBuffer.array();
-        // print started log
-        logger.info("MAMessageProducerExample.main started...");
 
+        // 7. wait all tasks finished
         try {
-            // initial producer objects
-            MAMessageProducerExample messageProducer =
-                    new MAMessageProducerExample(masterServers);
-            messageProducer.startService();
             // wait util sent message's count reachs required count
-            while (TOTAL_COUNTER.get() < msgCount * clientCount) {
-                logger.info("Sending, total messages is {}, filter messages is {}",
-                        SENT_SUCC_COUNTER.get(), filterMsgCount.get());
-                Thread.sleep(5000);
+            long needSentCnt = msgCount * clientCnt;
+            while (totalSentCnt.get() < needSentCnt) {
+                logger.info("Sending task is running, total = {}, finished = {}",
+                        needSentCnt, totalSentCnt.get());
+                Thread.sleep(30000);
             }
-            logger.info("Finished, total messages is {}, filter messages is {}",
-                    SENT_SUCC_COUNTER.get(), filterMsgCount.get());
-            messageProducer.producerMap.clear();
-            messageProducer.shutdown();
-        } catch (TubeClientException e) {
-            logger.error("TubeClientException: ", e);
         } catch (Throwable e) {
             logger.error("Throwable: ", e);
         }
-    }
-
-    public MessageProducer createProducer() throws TubeClientException {
-        int index = (producerIndex.incrementAndGet()) % SESSION_FACTORY_NUM;
-        return sessionFactoryList.get(index).createProducer();
-    }
-
-    private void startService() throws TubeClientException {
-        for (int i = 0; i < clientCount; i++) {
-            PRODUCER_LIST.add(createProducer());
-        }
-
-        for (MessageProducer producer : PRODUCER_LIST) {
-            if (producer != null) {
-                producerMap.put(producer, new Sender(producer));
-                sendExecutorService.submit(producerMap.get(producer));
-            }
-        }
-    }
-
-    public void shutdown() throws Throwable {
+        // clear resources
         sendExecutorService.shutdownNow();
-        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
-            sessionFactoryList.get(i).shutdown();
+        for (int i = 0; i < sessionFactoryCnt; i++) {
+            sessionFactoryProducerMap.get(i).getF0().shutdown();
         }
-
+        msgSendStats.stopStats();
+        logger.info("Sending task is finished, total sent {} messages", totalSentCnt.get());
     }
 
-    public class Sender implements Runnable {
-        private MessageProducer producer;
+    public static class Sender implements Runnable {
 
-        public Sender(MessageProducer producer) {
+        private final int indexId;
+        private final MessageProducer producer;
+        private final byte[] bodyData;
+        private final long msgCount;
+        private final List<Tuple2<String, String>> topicFilterTuples;
+
+        public Sender(int indexId, MessageProducer producer, byte[] bodyData,
+                      List<Tuple2<String, String>> topicFilterTuples, long msgCount) {
+            this.indexId = indexId;
             this.producer = producer;
+            this.bodyData = bodyData;
+            this.msgCount = msgCount;
+            this.topicFilterTuples = topicFilterTuples;
         }
 
         @Override
         public void run() {
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-            try {
-                producer.publish(topicAndFiltersMap.keySet());
-            } catch (Throwable t) {
-                logger.error("publish exception: ", t);
-            }
+            // send message to server
             long sentCount = 0;
             int roundIndex = 0;
-            int targetCnt = topicSendRounds.size();
+            int targetCnt = topicFilterTuples.size();
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
             while (msgCount < 0 || sentCount < msgCount) {
-                long millis = System.currentTimeMillis();
+                // 1 Rotate to get the attribute information to be sent
                 roundIndex = (int) (sentCount++ % targetCnt);
-                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                Message message = new Message(target.getF0(), sendData);
-                message.setAttrKeyVal("index", String.valueOf(sentCount));
-                message.setAttrKeyVal("dataTime", String.valueOf(millis));
-                if (target.getF1() != null) {
-                    filterMsgCount.incrementAndGet();
-                    message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
-                }
+                Tuple2<String, String> target = topicFilterTuples.get(roundIndex);
+
+                // 2 send message
                 try {
-                    // next line sends message synchronously, which is not recommended
-                    //producer.sendMessage(message);
-                    // send message asynchronously, recommended
-                    producer.sendMessage(message, new DefaultSendCallback());
-                } catch (Throwable e1) {
-                    TOTAL_COUNTER.incrementAndGet();
-                    SENT_EXCEPT_COUNTER.incrementAndGet();
-                    logger.error("sendMessage exception: ", e1);
-                }
-                TOTAL_COUNTER.incrementAndGet();
-                // only for test, delay inflight message's count
-                if (sentCount % 5000 == 0) {
-                    ThreadUtils.sleep(3000);
-                } else if (sentCount % 4000 == 0) {
-                    ThreadUtils.sleep(2000);
-                } else if (sentCount % 2000 == 0) {
-                    ThreadUtils.sleep(800);
-                } else if (sentCount % 1000 == 0) {
-                    ThreadUtils.sleep(400);
+                    // 2.1 Send data asynchronously, recommended
+                    producer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+                            target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+                    // Or
+                    // 2.2 Send message synchronous, not recommended
+                    // MessageSentResult result = producer.sendMessage(message);
+                    // totalSentCnt.incrementAndGet();
+                    // if (!result.isSuccess()) {
+                    //    logger.error("Sync-send message failed!" + result.getErrMsg());
+                    // }
+                } catch (TubeClientException | InterruptedException e) {
+                    logger.error("Send message failed!", e);
                 }
+                // 3 Cool sending
+                //     Attention: only used in the test link, to solve the problem of
+                //                frequent sending failures caused by insufficient test resources.
+                MixedUtils.coolSending(sentCount);
             }
             try {
                 producer.shutdown();
             } catch (Throwable e) {
                 logger.error("producer shutdown error: ", e);
             }
-
+            logger.info("The message sending task(" + indexId + ") has been completed!");
         }
     }
 
-    private class DefaultSendCallback implements MessageSentCallback {
+    private static class DefaultSendCallback implements MessageSentCallback {
+
         @Override
         public void onMessageSent(MessageSentResult result) {
-            TOTAL_COUNTER.incrementAndGet();
+            totalSentCnt.incrementAndGet();
             if (result.isSuccess()) {
-                SENT_SUCC_COUNTER.incrementAndGet();
+                msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
             } else {
-                SENT_FAIL_COUNTER.incrementAndGet();
+                logger.error("Send message failed!" + result.getErrMsg());
             }
         }
 
         @Override
         public void onException(Throwable e) {
-            TOTAL_COUNTER.incrementAndGet();
-            SENT_EXCEPT_COUNTER.incrementAndGet();
+            totalSentCnt.incrementAndGet();
             logger.error("Send message error!", e);
         }
     }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
deleted file mode 100644
index f251a67..0000000
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.example;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.tubemq.client.common.PeerInfo;
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
-import org.apache.inlong.tubemq.client.consumer.MessageListener;
-import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This demo shows how to consume message sequentially.
- *
- *Consumer supports subscribe multiple topics in one consume group. Message from subscription
- * sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
- * to perform any blocking operation inside the callback.
- *
- *As for consumption control of {@link PushMessageConsumer}, business logic is able to monitor
- * current state and adjust consumption by
- *
- *<ul>
- *     <li>call {@link PushMessageConsumer#pauseConsume()} to pause consumption when high water mark exceeded.</li>
- *     <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
- * </ul>
- */
-public final class MessageConsumerExample {
-
-    private static final Logger logger =
-            LoggerFactory.getLogger(MessageConsumerExample.class);
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
-    private final PushMessageConsumer messageConsumer;
-    private final MessageSessionFactory messageSessionFactory;
-
-    public MessageConsumerExample(String masterHostAndPort,
-                                  String group, int fetchThreadCnt) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        if (fetchThreadCnt > 0) {
-            consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
-        }
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
-    }
-
-    public static void main(String[] args) {
-        final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int clientCount = Integer.parseInt(args[3]);
-        int threadCnt = -1;
-        if (args.length > 5) {
-            threadCnt = Integer.parseInt(args[4]);
-        }
-        final int fetchThreadCnt = threadCnt;
-        final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        final ExecutorService executorService =
-                Executors.newCachedThreadPool();
-        for (int i = 0; i < clientCount; i++) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        MessageConsumerExample messageConsumer =
-                                new MessageConsumerExample(masterServers,
-                                        group, fetchThreadCnt);
-                        messageConsumer.subscribe(topicAndFiltersMap);
-                    } catch (Exception e) {
-                        logger.error("Create consumer failed!", e);
-                    }
-                }
-            });
-        }
-        final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
-        statisticThread.start();
-
-        executorService.shutdown();
-        try {
-            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Thread Pool shutdown has been interrupted!");
-        }
-        msgRecvStats.stopStats();
-    }
-
-    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
-        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            MessageListener messageListener = new DefaultMessageListener(entry.getKey());
-            messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
-        }
-        messageConsumer.completeSubscribe();
-    }
-
-    public static class DefaultMessageListener implements MessageListener {
-
-        private String topic;
-
-        public DefaultMessageListener(String topic) {
-            this.topic = topic;
-        }
-
-        @Override
-        public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
-            if (messages != null && !messages.isEmpty()) {
-                msgRecvStats.addMsgCount(this.topic, messages.size());
-            }
-        }
-
-        @Override
-        public Executor getExecutor() {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-        }
-    }
-}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index 18462c7..afffde7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -17,17 +17,11 @@
 
 package org.apache.inlong.tubemq.example;
 
-import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -35,9 +29,7 @@ import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,144 +44,95 @@ public final class MessageProducerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessageProducerExample.class);
-    private static final ConcurrentHashMap<String, AtomicLong> counterMap =
-            new ConcurrentHashMap<>();
+    private static final MsgSendReceiveStats msgSendStats =
+            new MsgSendReceiveStats(true);
 
-    private final MessageProducer messageProducer;
-    private final MessageSessionFactory messageSessionFactory;
+    private static MessageSessionFactory sessionFactory;
+    private static MessageProducer messageProducer;
 
-    public MessageProducerExample(String masterServers) throws Exception {
-        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
-        this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
-        this.messageProducer = messageSessionFactory.createProducer();
-    }
-
-    public static void main(String[] args) {
-        // get and initial parameters
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to publish to.
+    // The 3rd parameter msgCount is the message amount that needs to be sent
+    // The 4th parameter pkgSize is the message's body size that needs to be sent
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
+        final String pubTopicAndFilterItems = args[1];
         final long msgCount = Long.parseLong(args[2]);
-        final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial send target
-        final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
-        // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
-        }
-        // initial sent data
-        String body = "This is a test message from single-session-factory.";
-        byte[] bodyBytes = StringUtils.getBytesUtf8(body);
-        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-        while (dataBuffer.hasRemaining()) {
-            int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
-        }
-        dataBuffer.flip();
-        // send messages
-        try {
-            long sentCount = 0;
-            int roundIndex = 0;
-            int targetCnt = topicSendRounds.size();
-            MessageProducerExample messageProducer =
-                    new MessageProducerExample(masterServers);
-            messageProducer.publishTopics(topicAndFiltersMap.keySet());
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-            while (msgCount < 0 || sentCount < msgCount) {
-                roundIndex = (int) (sentCount++ % targetCnt);
-                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                Message message = new Message(target.getF0(), dataBuffer.array());
-                long currTimeMillis = System.currentTimeMillis();
-                message.setAttrKeyVal("index", String.valueOf(sentCount));
-                message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
-                if (target.getF1() != null) {
-                    message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
-                }
-                try {
-                    // 1.1 next line sends message synchronously, which is not recommended
-                    // messageProducer.sendMessage(message);
-                    // 1.2 send message asynchronously, recommended
-                    messageProducer.sendMessageAsync(message,
-                            messageProducer.new DefaultSendCallback());
-                } catch (Throwable e1) {
-                    logger.error("Send Message throw exception  ", e1);
-                }
-                // only for test, delay inflight message's count
-                if (sentCount % 20000 == 0) {
-                    ThreadUtils.sleep(4000);
-                } else if (sentCount % 10000 == 0) {
-                    ThreadUtils.sleep(2000);
-                } else if (sentCount % 2500 == 0) {
-                    ThreadUtils.sleep(300);
-                }
-            }
-            ThreadUtils.sleep(20000);
-            for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
-                logger.info(
-                    "********* Current {} Message sent count is {}",
-                    entry.getKey(),
-                    entry.getValue().get()
-                );
-            }
-        } catch (Throwable e) {
-            e.printStackTrace();
+        int pkgSize = 1024;
+        if (args.length > 3) {
+            pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
         }
-    }
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(pubTopicAndFilterItems);
 
-    public void publishTopics(Set<String> topicSet) throws TubeClientException {
-        this.messageProducer.publish(topicSet);
-    }
+        // 2. initial configure, session factory object, and producer object
+        TubeClientConfig clientConfig =
+                new TubeClientConfig(masterServers);
+        sessionFactory = new TubeSingleSessionFactory(clientConfig);
+        messageProducer = sessionFactory.createProducer();
+        messageProducer.publish(topicAndFiltersMap.keySet());
+
+        // 3. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgSendStats, "Sent Statistic Thread");
+        statisticThread.start();
+
+        // 4. build the content of the message to be sent
+        //    include message body, attributes, and time information template
+        final byte[] bodyData =
+                MixedUtils.buildTestData(pkgSize);
+        List<Tuple2<String, String>> buildTopicFilterTuples =
+                MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
 
-    /**
-     * Send message synchronous.
-     */
-    public void sendMessage(Message message) {
-        // date format is accurate to minute, not to second
-        try {
-            MessageSentResult result = messageProducer.sendMessage(message);
-            if (!result.isSuccess()) {
-                logger.error("Sync-send message failed!" + result.getErrMsg());
+        // 5. send message to server
+        long sentCount = 0;
+        int roundIndex = 0;
+        int targetCnt = buildTopicFilterTuples.size();
+        while (msgCount < 0 || sentCount < msgCount) {
+            // 5.1 Rotate to get the attribute information to be sent
+            roundIndex = (int) (sentCount++ % targetCnt);
+            Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
+
+            // 5.2 send message
+            try {
+                // 5.2.1 Send data asynchronously, recommended
+                messageProducer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+                        target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+                // Or
+                // 5.2.2 Send message synchronous, not recommended
+                // MessageSentResult result = messageProducer.sendMessage(message);
+                // if (!result.isSuccess()) {
+                //    logger.error("Sync-send message failed!" + result.getErrMsg());
+                // }
+            } catch (TubeClientException | InterruptedException e) {
+                logger.error("Send message failed!", e);
             }
-        } catch (TubeClientException | InterruptedException e) {
-            logger.error("Sync-send message failed!", e);
-        }
-    }
 
-    /**
-     * Send message asynchronous. More efficient and recommended.
-     */
-    public void sendMessageAsync(Message message, MessageSentCallback callback) {
-        try {
-            messageProducer.sendMessage(message, callback);
-        } catch (TubeClientException | InterruptedException e) {
-            logger.error("Async-send message failed!", e);
+            // 5.3 Cool sending
+            //     Attention: only used in the test link, to solve the problem of
+            //                frequent sending failures caused by insufficient test resources.
+            MixedUtils.coolSending(sentCount);
         }
+
+        // 6. Clean up resources and exit the service after the task is completed
+        //    Attention: TubeMQ client is suitable for serving as a resident service,
+        //               not suitable for creating Producer objects message by message
+        messageProducer.shutdown();
+        sessionFactory.shutdown();
+        msgSendStats.stopStats();
+        logger.info("The message sending task has been completed!");
     }
 
-    private class DefaultSendCallback implements MessageSentCallback {
+    private static class DefaultSendCallback implements MessageSentCallback {
 
         @Override
         public void onMessageSent(MessageSentResult result) {
             if (result.isSuccess()) {
-                String topicName = result.getMessage().getTopic();
-
-                AtomicLong currCount = counterMap.get(topicName);
-                if (currCount == null) {
-                    AtomicLong tmpCount = new AtomicLong(0);
-                    currCount = counterMap.putIfAbsent(topicName, tmpCount);
-                    if (currCount == null) {
-                        currCount = tmpCount;
-                    }
-                }
-                if (currCount.incrementAndGet() % 1000 == 0) {
-                    logger.info("Send " + topicName + " " + currCount.get() + " message!");
-                }
+                msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
             } else {
                 logger.error("Send message failed!" + result.getErrMsg());
             }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4c243fe..bf51d9e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -18,15 +18,14 @@
 package org.apache.inlong.tubemq.example;
 
 import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
+
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.inlong.tubemq.corebase.Message;
@@ -35,117 +34,155 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This demo shows how to consume message by pull.
+ * This demo shows how to consume messages through TubeSingleSessionFactory + PullMessageConsumer
  *
  *Consume message in pull mode achieved by {@link PullMessageConsumer#getMessage()}.
  * Note that whenever {@link PullMessageConsumer#getMessage()} returns successfully, the
- * return value(whether or not to be {@code null}) should be processed by
+ * return value(whether or not to be {@code null}) must be processed by
  * {@link PullMessageConsumer#confirmConsume(String, boolean)}.
  */
 public final class MessagePullConsumerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullConsumerExample.class);
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
-    private final PullMessageConsumer messagePullConsumer;
-    private final MessageSessionFactory messageSessionFactory;
-
-    public MessagePullConsumerExample(String masterHostAndPort, String group) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
-    }
 
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
+    private static PullMessageConsumer pullConsumer;
+    private static MessageSessionFactory sessionFactory;
+
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter consumeCount is the amount of messages that need to be consumed
+    // The 5th parameter fetchThreadCnt is the count of fetch thread
     public static void main(String[] args) throws Throwable {
-        // get and initial parameters
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int msgCount = Integer.parseInt(args[3]);
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        final int consumeCount = Integer.parseInt(args[3]);
+        int fetchThreadCnt = 3;
+        if (args.length > 4) {
+            fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+                    1, Runtime.getRuntime().availableProcessors());
+        }
         final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial consumer object
-        final MessagePullConsumerExample messageConsumer =
-                new MessagePullConsumerExample(masterServers, group);
-        messageConsumer.subscribe(topicAndFiltersMap);
-        Thread[] fetchRunners = new Thread[3];
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+        // 2. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterServers, groupName);
+        // 2.1 set consume from latest position if the consumer group is first consume
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2.2 build session factory object
+        //     Attention: here we are using the TubeSingleSessionFactory object(a
+        //                singleton session factory can only create one object in a process,
+        //                requiring all topics to be in one cluster.
+        //                If the topics subscribed to by the objects in the process are
+        //                in different clusters, then need to use the TubeMultiSessionFactory class,
+        //                please refer to the example of TubeMultiSessionFactory usage)
+        sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+        // 3 build consumer object
+        //    we can construct multiple consumers after the creation of the session factory object
+        pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+        // 3.1 Set the Topic and the filter item set corresponding to the consumption
+        //     if you not need filter consumption,
+        //    set the parameter filterConds is null or empty set
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            pullConsumer.subscribe(entry.getKey(), entry.getValue());
+        }
+        // 3.2 start consumption
+        pullConsumer.completeSubscribe();
+
+        // 4. initial fetch threads
+        Thread[] fetchRunners = new Thread[fetchThreadCnt];
         for (int i = 0; i < fetchRunners.length; i++) {
-            fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
-            fetchRunners[i].setName("_fetch_runner_" + i);
+            fetchRunners[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConsumerResult csmResult;
+                    int getCount = consumeCount;
+                    // wait partition status ready
+                    do {
+                        if (pullConsumer.isPartitionsReady(5000)
+                                || pullConsumer.isShutdown()) {
+                            break;
+                        }
+                    } while (true);
+                    // consume messages
+                    do {
+                        try {
+                            // 4.1 judge consumer is shutdown
+                            if (pullConsumer.isShutdown()) {
+                                logger.warn("Consumer is shutdown!");
+                                break;
+                            }
+                            // 4.2 get messages from server
+                            csmResult = pullConsumer.getMessage();
+                            if (csmResult.isSuccess()) {
+                                // 4.2.1 process message if getMessage() return success
+                                List<Message> messageList = csmResult.getMessageList();
+                                if (messageList != null && !messageList.isEmpty()) {
+                                    msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                                }
+                                // 4.2.1.1 confirm consume result
+                                // Notice:
+                                //    1. If the processing fails, the parameter isConsumed can
+                                //       be set to false, but this is likely to cause
+                                //       an infinite loop of data consumption, so
+                                //       it is strongly recommended to set this parameter
+                                //       to true when using it, and the failed data can
+                                //       be processed in other ways
+                                //    2. The messageList returned by getMessage() may be empty,
+                                //       and confirmConsume() is still required call in this case
+                                pullConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+                            } else {
+                                // 4.2.2 process failure when getMessage() return false
+                                //       Any failure can be ignored
+                                if (!IGNORE_ERROR_SET.contains(csmResult.getErrCode())) {
+                                    logger.debug(
+                                            "Receive messages errorCode is {}, Error message is {}",
+                                            csmResult.getErrCode(), csmResult.getErrMsg());
+                                }
+                            }
+                            // 4.3 Determine whether the consumed data reaches the goal
+                            if (consumeCount > 0) {
+                                if (--getCount <= 0) {
+                                    logger.info("Consumer has consumed {} messages!", consumeCount);
+                                    break;
+                                }
+                            }
+                        } catch (Throwable e) {
+                            // Any exceptions in running can be ignored
+                        }
+                    } while (true);
+                    logger.info("The fetch thread has exited!");
+                }
+            }, "_fetch_runner_" + i);
         }
-        // initial fetch threads
+
+        // 5. start fetch threads
         for (Thread thread : fetchRunners) {
             thread.start();
         }
-        // initial statistic thread
+
+        // 6. initial and statistic thread
         Thread statisticThread =
-                new Thread(msgRecvStats, "Sent Statistic Thread");
+                new Thread(msgRcvStats, "Receive Statistic Thread");
         statisticThread.start();
-    }
-
-    public void subscribe(
-            Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
-        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
-        }
-        messagePullConsumer.completeSubscribe();
-    }
-
-    public ConsumerResult getMessage() throws TubeClientException {
-        return messagePullConsumer.getMessage();
-    }
-
-    public ConsumerResult confirmConsume(final String confirmContext,
-                                         boolean isConsumed) throws TubeClientException {
-        return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
-    }
-
-    public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
-        return messagePullConsumer.getCurConsumedPartitions();
-    }
 
-    private static class FetchRequestRunner implements Runnable {
-
-        final MessagePullConsumerExample messageConsumer;
-        final int consumeCount;
-
-        FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
-            this.messageConsumer = messageConsumer;
-            this.consumeCount = msgCount;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int getCount = consumeCount;
-                do {
-                    ConsumerResult result = messageConsumer.getMessage();
-                    if (result.isSuccess()) {
-                        List<Message> messageList = result.getMessageList();
-                        if (messageList != null && !messageList.isEmpty()) {
-                            msgRecvStats.addMsgCount(result.getTopicName(), messageList.size());
-                        }
-                        messageConsumer.confirmConsume(result.getConfirmContext(), true);
-                    } else {
-                        if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
-                            logger.info(
-                                    "Receive messages errorCode is {}, Error message is {}",
-                                    result.getErrCode(), result.getErrMsg());
-                        }
-                    }
-                    if (consumeCount > 0) {
-                        if (--getCount <= 0) {
-                            break;
-                        }
-                    }
-                } while (true);
-                msgRecvStats.stopStats();
-            } catch (TubeClientException e) {
-                logger.error("Create consumer failed!", e);
-            }
-        }
+        // 7. Resource cleanup when exiting the service
+        //
+        // 7.1 shutdown consumers
+        // pullConsumer.shutdown();
+        // 7.2 shutdown session factory
+        // sessionFactory.shutdown();
+        // 7.3 shutdown statistic thread
+        // msgRecvStats.stopStats();
     }
 }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index d50b431..2bc7080 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,29 +17,31 @@
 
 package org.apache.inlong.tubemq.example;
 
-import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
+ * This demo shows how to consume messages with bootstrap offset through
+ * TubeMultiSessionFactory + PullMessageConsumer
+ *
+ * First, we start a consumer for data consumption, and then we exit it, start another consumer
+ * with the same group name, and set the partition's bootstrap offset to 0 for data consumption.
+ *
+ * The main difference from {@link MessagePullConsumerExample}
  * is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
  * {@link PullMessageConsumer#completeSubscribe()}. The former supports multiple options to configure
  * when to reset offset.
@@ -48,141 +50,214 @@ public final class MessagePullSetConsumerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
-    private static final AtomicLong counter = new AtomicLong(0);
-
-    private final PullMessageConsumer messagePullConsumer;
-    private final MessageSessionFactory messageSessionFactory;
-
-    public MessagePullSetConsumerExample(String masterHostAndPort,
-                                         String group) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
-    }
+    // Statistic object
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
+    // The map of the master cluster and Multiple session factory
+    //    There may be multiple consumers in the same process and the topic sets subscribed
+    //    by different consumers are in different clusters. In this case,
+    //    we need to construct session factories by TubeMultiSessionFactory class.
+    private static final ConcurrentHashMap<String, MessageSessionFactory> multSessFtyMap =
+            new ConcurrentHashMap<>();
 
-    public static void main(String[] args) {
-        // get and initial parameters
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter consumeCount is the amount of messages that need to be consumed
+    // The 5th  parameter fetchThreadCnt is the count of fetch thread
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int msgCount = Integer.parseInt(args[3]);
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        final int consumeCount = Integer.parseInt(args[3]);
+        int fetchThreadCnt = 3;
+        if (args.length > 4) {
+            fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+                    1, Runtime.getRuntime().availableProcessors());
+        }
         final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial reset offset parameters
-        // (The offset specified is only a demo)
-        final Map<String, Long> partOffsetMap =
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+        // 2. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgRcvStats, "Receive Statistic Thread");
+        statisticThread.start();
+
+        // 3. Start the consumer group for the first consumption
+        // 3.1. build consumer object
+        logger.info("The first consumption begin!");
+        PullMessageConsumer consumer1 =
+                createPullConsumer(masterServers, groupName);
+        // 3.2. Set the Topic and the filter item set corresponding to the consumption
+        //     if you not need filter consumption,
+        //    set the parameter filterConds is null or empty set
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            consumer1.subscribe(entry.getKey(), entry.getValue());
+        }
+        // 3.3. start consumption with
+        String sessionKeyFst = "test_consume_first";
+        int sourceCountFst = 1;
+        boolean isSelectBig = false;
+        // The map of partitionKey and last success offset
+        //    You can persist the information and use it when restarting or
+        //    re-rolling to keep the current consumption to start from
+        //    the offset required in the last round
+        ConcurrentHashMap<String, Long> partOffsetMapFst =
                 new ConcurrentHashMap<>();
-        partOffsetMap.put("123:test_1:0", 0L);
-        partOffsetMap.put("123:test_1:1", 0L);
-        partOffsetMap.put("123:test_1:2", 0L);
-        partOffsetMap.put("123:test_2:0", 350L);
-        partOffsetMap.put("123:test_2:1", 350L);
-        partOffsetMap.put("123:test_2:2", 350L);
-
-        ExecutorService executorService = Executors.newCachedThreadPool();
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    int getCount = msgCount;
-                    MessagePullSetConsumerExample messageConsumer =
-                        new MessagePullSetConsumerExample(masterServers, group);
-                    messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
-                    // main logic of consuming
-                    do {
-                        ConsumerResult result = messageConsumer.getMessage();
-                        if (result.isSuccess()) {
-                            List<Message> messageList = result.getMessageList();
-                            if (messageList != null) {
-                                logger.info("Receive messages:" + counter.addAndGet(messageList.size()));
-                            }
-
-                            // Offset returned by GetMessage represents the initial offset of this request
-                            // if consumer group is pure Pull mode, the initial offset can be saved;
-                            // if not, we have to use the return value of confirmConsume
-                            long oldValue = partOffsetMap.get(
-                                result.getPartitionKey()) == null
-                                ? -1
-                                : partOffsetMap.get(result.getPartitionKey());
-                            partOffsetMap.put(result.getPartitionKey(), result.getCurrOffset());
-                            logger.info(
-                                "GetMessage , partitionKey={}, oldValue={}, newVal={}",
-                                new Object[]{result.getPartitionKey(), oldValue, result.getCurrOffset()});
-
-                            // save the Offset from the return value of confirmConsume
-                            ConsumerResult confirmResult = messageConsumer.confirmConsume(
-                                result.getConfirmContext(),
-                                true);
-                            if (confirmResult.isSuccess()) {
-                                oldValue = partOffsetMap.get(
-                                    result.getPartitionKey()) == null
-                                    ? -1
-                                    : partOffsetMap.get(result.getPartitionKey());
-                                partOffsetMap.put(result.getPartitionKey(), confirmResult.getCurrOffset());
-                                logger.info(
-                                    "ConfirmConsume , partitionKey={}, oldValue={}, newVal={}",
-                                    new Object[]{
-                                        confirmResult.getPartitionKey(),
-                                        oldValue,
-                                        confirmResult.getCurrOffset()});
-                            } else {
-                                logger.info(
-                                    "ConfirmConsume failure, errCode is {}, errInfo is {}.",
-                                    confirmResult.getErrCode(),
-                                    confirmResult.getErrMsg());
-                            }
-                        } else {
-                            if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
-                                logger.info(
-                                        "Receive messages errorCode is {}, Error message is {}",
-                                        result.getErrCode(), result.getErrMsg());
-                            }
-                        }
-                        if (msgCount >= 0) {
-                            if (--getCount <= 0) {
-                                break;
-                            }
-                        }
-                    } while (true);
-                } catch (Exception e) {
-                    logger.error("Create consumer failed!", e);
-                }
-            }
-        });
+        consumer1.completeSubscribe(sessionKeyFst,
+                sourceCountFst, isSelectBig, partOffsetMapFst);
 
-        executorService.shutdown();
-        try {
-            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Thread Pool shutdown has been interrupted!");
+        // 3.4. initial and start fetch threads
+        Thread[] fetchRunners1 = new Thread[fetchThreadCnt];
+        for (int i = 0; i < fetchRunners1.length; i++) {
+            fetchRunners1[i] = new Thread(new FetchRequestRunner(
+                    consumer1, partOffsetMapFst, consumeCount), "_fetch_runner_" + i);
+        }
+        for (Thread thread : fetchRunners1) {
+            thread.start();
+        }
+
+        // 3.5. wait consume data
+        ThreadUtils.sleep(2 * 60 * 1000);
+        logger.info("The first consumption has finished!");
+
+        // 3.6. shutdown consumer
+        consumer1.shutdown();
+        for (Thread thread : fetchRunners1) {
+            thread.join();
         }
-    }
 
-    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
-                          Map<String, Long> partOffsetMap) throws TubeClientException {
+        // 4. Start the consumer group for the second consumption
+        // 4.1 set the boostrap Offset, here we set consumption from 0
+        logger.info("The second consumption begin!");
+        String sessionKeySec = "test_consume_Second";
+        int sourceCountSec = 1;
+        ConcurrentHashMap<String, Long> partOffsetMapSec =
+                new ConcurrentHashMap<>();
+        for (String partKey : partOffsetMapFst.keySet()) {
+            partOffsetMapSec.put(partKey, 0L);
+        }
+        // 4.2. build consumer object
+        PullMessageConsumer consumer2 =
+                createPullConsumer(masterServers, groupName);
+        // 4.3. Set the Topic and the filter item set corresponding to the consumption
         for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
+            consumer2.subscribe(entry.getKey(), entry.getValue());
         }
-        String sessionKey = "test_reset2";
-        int consumerCount = 2;
-        boolean isSelectBig = false;
-        messagePullConsumer.completeSubscribe(sessionKey,
-                consumerCount, isSelectBig, partOffsetMap);
-    }
+        consumer2.completeSubscribe(sessionKeySec,
+                sourceCountSec, isSelectBig, partOffsetMapSec);
 
-    public ConsumerResult getMessage() throws TubeClientException {
-        return messagePullConsumer.getMessage();
+        // 4.4. initial and start fetch threads
+        Thread[] fetchRunners2 = new Thread[fetchThreadCnt];
+        for (int i = 0; i < fetchRunners2.length; i++) {
+            fetchRunners2[i] = new Thread(new FetchRequestRunner(
+                    consumer2, partOffsetMapSec, consumeCount), "_fetch_runner_" + i);
+        }
+        for (Thread thread : fetchRunners2) {
+            thread.start();
+        }
     }
 
-    public ConsumerResult confirmConsume(
-        String confirmContext,
-        boolean isConsumed
-    ) throws TubeClientException {
-        return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
+    //  consumer object creation function
+    private static PullMessageConsumer createPullConsumer(
+            String masterHostAndPorts, String groupName) throws Exception {
+        // 1. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterHostAndPorts, groupName);
+        // 1.2 set consume from latest position if the consumer group is first consume
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2. build session factory object
+        //    find and initialize TubeMultiSessionFactory object according to the Master cluster information
+        MasterInfo masterInfo = consumerConfig.getMasterInfo();
+        MessageSessionFactory sessionFactory =
+                multSessFtyMap.get(masterInfo.getMasterClusterStr());
+        if (sessionFactory == null) {
+            MessageSessionFactory tmpSessionFactory =
+                    new TubeMultiSessionFactory(consumerConfig);
+            sessionFactory = multSessFtyMap.putIfAbsent(
+                    masterInfo.getMasterClusterStr(), tmpSessionFactory);
+            if (sessionFactory == null) {
+                sessionFactory = tmpSessionFactory;
+            } else {
+                tmpSessionFactory.shutdown();
+            }
+        }
+        // 3. Create and get the PullMessageConsumer object
+        return sessionFactory.createPullConsumer(consumerConfig);
     }
 
-    public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
-        return messagePullConsumer.getCurConsumedPartitions();
+    // fetch message runner
+    private static class FetchRequestRunner implements Runnable {
+
+        final PullMessageConsumer pullConsumer;
+        final ConcurrentHashMap<String, Long> partOffsetMap;
+        final int consumeCount;
+
+        FetchRequestRunner(PullMessageConsumer messageConsumer,
+                           ConcurrentHashMap<String, Long> partOffsetMap,
+                           int msgCount) {
+            this.pullConsumer = messageConsumer;
+            this.partOffsetMap = partOffsetMap;
+            this.consumeCount = msgCount;
+        }
+
+        @Override
+        public void run() {
+            ConsumerResult csmResult;
+            ConsumerResult cfmResult;
+            int getCount = consumeCount;
+            // wait partition status ready
+            do {
+                if (pullConsumer.isPartitionsReady(5000)
+                        || pullConsumer.isShutdown()) {
+                    break;
+                }
+            } while (true);
+            // consume messages
+            do {
+                try {
+                    // 1 judge consumer is shutdown
+                    if (pullConsumer.isShutdown()) {
+                        logger.warn("Consumer is shutdown!");
+                        break;
+                    }
+                    // 2 get messages from server
+                    csmResult = pullConsumer.getMessage();
+                    if (csmResult.isSuccess()) {
+                        // 2.1 process message if getMessage() return success
+                        List<Message> messageList = csmResult.getMessageList();
+                        if (messageList != null && !messageList.isEmpty()) {
+                            msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                        }
+                        // 2.1.2 store the offset of processing message
+                        //       the offset returned by GetMessage() represents the initial offset of this request
+                        //       if consumer group is pure Pull mode, the initial offset can be saved;
+                        //       if not, we have to use the return value of confirmConsume()
+                        partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+                        // 2.1.3 confirm consume result
+                        cfmResult = pullConsumer.confirmConsume(
+                                csmResult.getConfirmContext(), true);
+                        if (cfmResult.isSuccess()) {
+                            // store confirmed offset value
+                            partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+                        }
+                    }
+                    // 3. Determine whether the consumed data reaches the goal
+                    if (consumeCount > 0) {
+                        if (--getCount <= 0) {
+                            logger.info("Consumer has consumed {} messages!", consumeCount);
+                            break;
+                        }
+                    }
+                } catch (Throwable e) {
+                    // Any exceptions in running can be ignored
+                }
+            } while (true);
+            logger.info("The fetch thread has exited!");
+        }
     }
 }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
new file mode 100644
index 0000000..94c8d15
--- /dev/null
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.example;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+
+/**
+ * This demo shows how to consume messages sequentially in Push mode.
+ *
+ *Consumer supports subscribe multiple topics in one consume group. Message from subscription
+ * sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
+ * to perform any blocking operation inside the callback.
+ *
+ *As for consumption control of {@link PushMessageConsumer}, business logic is able to monitor
+ * current state and adjust consumption by
+ *
+ *<ul>
+ *     <li>call {@link PushMessageConsumer#pauseConsume()} to pause consumption when high water mark exceeded.</li>
+ *     <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
+ * </ul>
+ */
+public final class MessagePushConsumerExample {
+
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
+    private static MessageSessionFactory sessionFactory;
+    private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
+
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter clientCount is the amount of consumer
+    // The 5th parameter fetchThreadCnt is the count of fetch thread
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
+        final String masterServers = args[0];
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        int clientCount = Integer.parseInt(args[3]);
+        if (clientCount <= 0) {
+            clientCount = 1;
+        }
+        int paraFetchThreadCnt = -1;
+        if (args.length > 5) {
+            paraFetchThreadCnt = Integer.parseInt(args[4]);
+        }
+        final int fetchThreadCnt = paraFetchThreadCnt;
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+        // 2. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterServers, groupName);
+        // 2.1. set consume from latest position if the consumer group is first consume
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2.2. set the fetch thread count of push consumer
+        if (fetchThreadCnt > 0) {
+            consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
+        }
+        // 2.3. build session factory object
+        sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+        // 3. build and start consumer object
+        for (int i = 0; i < clientCount; i++) {
+            // 3.1. build and start consumer object
+            PushMessageConsumer consumer =
+                    sessionFactory.createPushConsumer(consumerConfig);
+            // 3.2. set subscribed topic and Listener
+            for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+                MessageListener messageListener = new DefaultMessageListener(entry.getKey());
+                consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
+            }
+            // 3.3 start consumer
+            consumer.completeSubscribe();
+            // 3.4 store consumer object
+            consumerMap.put(consumer.getConsumerId(), consumer);
+        }
+
+        // 4. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgRcvStats, "Receive Statistic Thread");
+        statisticThread.start();
+
+        // 5. Resource cleanup when exiting the service
+        //
+        // 5.1 shutdown consumers
+        // for (PushMessageConsumer consumer : consumerMap.values()) {
+        //     consumer.shutdown();
+        // }
+        // 5.2 shutdown session factory
+        // sessionFactory.shutdown();
+        // 5.3 shutdown statistic thread
+        // msgRecvStats.stopStats();
+    }
+
+    // Message callback processing class.
+    // After the SDK receives the message, it will pass the message back to the business layer
+    // for message processing by calling the receiveMessages() API of this class
+    public static class DefaultMessageListener implements MessageListener {
+
+        private final String topic;
+
+        public DefaultMessageListener(String topic) {
+            this.topic = topic;
+        }
+
+        @Override
+        public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
+            if (messages != null && !messages.isEmpty()) {
+                msgRcvStats.addMsgCount(this.topic, messages.size());
+            }
+        }
+
+        @Override
+        public Executor getExecutor() {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
similarity index 69%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
index 9074026..cfc8a55 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
@@ -28,19 +28,24 @@ import org.slf4j.LoggerFactory;
 /**
  * This demo shows how to collect and report message received statistics.
  */
-public class MsgRecvStats implements Runnable {
-    private static final Logger logger = LoggerFactory.getLogger(MsgRecvStats.class);
+public class MsgSendReceiveStats implements Runnable {
+    private final boolean isProducer;
+    private static final Logger logger = LoggerFactory.getLogger(MsgSendReceiveStats.class);
     private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
     private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
-    private AtomicBoolean isStarted = new AtomicBoolean(false);
+    private final AtomicBoolean isStarted = new AtomicBoolean(true);
+
+    public MsgSendReceiveStats(boolean isProducer) {
+        this.isProducer = isProducer;
+    }
 
     @Override
     public void run() {
+        // Indicator output every 30 seconds
         while (isStarted.get()) {
             try {
                 for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
                     long currCount = entry.getValue().get();
-
                     AtomicLong befCount = befCountMap.get(entry.getKey());
                     if (befCount == null) {
                         AtomicLong tmpCount = new AtomicLong(0);
@@ -49,10 +54,14 @@ public class MsgRecvStats implements Runnable {
                             befCount = tmpCount;
                         }
                     }
-
-                    logger.info("********* Current {} Message receive count is {}, dlt is {}",
-                        new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
-
+                    if (isProducer) {
+                        logger.info("********* Current {} Message sent count is {}, dlt is {}",
+                                new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+                    } else {
+                        logger.info("********* Current {} Message received count is {}, dlt is {}",
+                                new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+                    }
+                    befCountMap.get(entry.getKey()).set(currCount);
                 }
             } catch (Throwable t) {
                 // ignore
@@ -71,9 +80,13 @@ public class MsgRecvStats implements Runnable {
                     currCount = tmpCount;
                 }
             }
-
-            if (currCount.addAndGet(msgCnt) % 500 == 0) {
-                logger.info("Receive messages:" + currCount.get());
+            // Indicator output every 1000
+            if (currCount.addAndGet(msgCnt) % 1000 == 0) {
+                if (isProducer) {
+                    logger.info("Sent " + topicName + " messages:" + currCount.get());
+                } else {
+                    logger.info("Received " + topicName + " messages:" + currCount.get());
+                }
             }
         }
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index fbf19fd..0ac63a5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.tools.cli;
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +63,7 @@ public class CliProducer extends CliAbstractBase {
     // sent data content
     private static byte[] sentData;
     private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
-    private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
     private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
     private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
     // cli parameters
@@ -181,21 +180,13 @@ public class CliProducer extends CliAbstractBase {
 
     // initial tubemq client order by caller required
     public void initTask() throws Exception {
-        // initial sent data
-        sentData = MixedUtils.buildTestData(msgDataSize);
         // initial client configure
         TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
         clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+        // initial sent data
+        sentData = MixedUtils.buildTestData(msgDataSize);
         // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
-        }
+        topicSendRounds = MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
         // initial send thread service
         sendExecutorService =
                 Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
@@ -265,13 +256,9 @@ public class CliProducer extends CliAbstractBase {
             while (msgCount < 0 || sentCount < msgCount) {
                 roundIndex = (int) (sentCount++ % topicAndCondCnt);
                 try {
-                    long millis = System.currentTimeMillis();
                     Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                    Message message = new Message(target.getF0(), sentData);
-                    if (target.getF1() != null) {
-                        // if include filter, add filter item
-                        message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
-                    }
+                    Message message = MixedUtils.buildMessage(
+                            target.getF0(), target.getF1(), sentData, sentCount, sdf);
                     // use sync or async process
                     if (syncProduction) {
                         MessageSentResult procResult =
@@ -293,15 +280,7 @@ public class CliProducer extends CliAbstractBase {
                 // Limit sending flow control to avoid frequent errors
                 // caused by too many inflight messages being sent
                 if (!withoutDelay) {
-                    if (sentCount % 5000 == 0) {
-                        ThreadUtils.sleep(3000);
-                    } else if (sentCount % 4000 == 0) {
-                        ThreadUtils.sleep(2000);
-                    } else if (sentCount % 2000 == 0) {
-                        ThreadUtils.sleep(800);
-                    } else if (sentCount % 1000 == 0) {
-                        ThreadUtils.sleep(400);
-                    }
+                    MixedUtils.coolSending(sentCount);
                 }
             }
             // finished, close client