You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:11 UTC

[incubator-tubemq] 13/49: [TUBEMQ-449]Adjust Example implementation (#348)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 8acad06b615063659219f8097de0478b76a10011
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Dec 14 18:21:36 2020 +0800

    [TUBEMQ-449]Adjust Example implementation (#348)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../org/apache/tubemq/corebase/utils/Tuple2.java   |  53 +++++++
 .../tubemq/example/MAMessageProducerExample.java   | 158 +++++++++++----------
 .../tubemq/example/MessageConsumerExample.java     |  60 ++++----
 .../tubemq/example/MessageProducerExample.java     | 131 +++++++++--------
 .../tubemq/example/MessagePullConsumerExample.java |  63 ++++----
 .../example/MessagePullSetConsumerExample.java     |  62 ++++----
 .../tubemq/server/tools/cli/CliProducer.java       |  30 ++--
 7 files changed, 292 insertions(+), 265 deletions(-)

diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
new file mode 100644
index 0000000..f5626f8
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+public class Tuple2<T0, T1> {
+
+    /** Field 0 of the tuple. */
+    public T0 f0 = null;
+    /** Field 1 of the tuple. */
+    public T1 f1 = null;
+
+    /**
+     * Creates a new tuple where all fields are null.
+     */
+    public Tuple2() {
+
+    }
+
+    /**
+     * Creates a new tuple with field 0 specified.
+     *
+     * @param value0 The value for field 0
+     */
+    public Tuple2(T0 value0) {
+        this.f0 = value0;
+    }
+
+    /**
+     * Creates a new tuple and assigns the given values to the tuple's fields.
+     *
+     * @param value0 The value for field 0
+     * @param value1 The value for field 1
+     */
+    public Tuple2(T0 value0, T1 value1) {
+        this.f0 = value0;
+        this.f1 = value1;
+    }
+}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index f76b077..ea546c9 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -20,12 +20,10 @@ package org.apache.tubemq.example;
 import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,8 +39,9 @@ import org.apache.tubemq.client.producer.MessageProducer;
 import org.apache.tubemq.client.producer.MessageSentCallback;
 import org.apache.tubemq.client.producer.MessageSentResult;
 import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,19 +51,24 @@ import org.slf4j.LoggerFactory;
  * to improve throughput from client to broker.
  */
 public class MAMessageProducerExample {
-    private static final Logger logger = LoggerFactory.getLogger(MAMessageProducerExample.class);
+    private static final Logger logger =
+            LoggerFactory.getLogger(MAMessageProducerExample.class);
+    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
     private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
+    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
+    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
+
     private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
     private static final int MAX_PRODUCER_NUM = 100;
     private static final int SESSION_FACTORY_NUM = 10;
 
-    private static Set<String> topicSet;
+    private static Map<String, TreeSet<String>> topicAndFiltersMap;
+    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
     private static int msgCount;
-    private static int producerCount;
+    private static int clientCount;
     private static byte[] sendData;
+    private static AtomicLong filterMsgCount = new AtomicLong(0);
 
-    private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"};
-    private final Set<String> filters = new TreeSet<>();
     private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
     private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
     private final ExecutorService sendExecutorService =
@@ -76,13 +80,9 @@ public class MAMessageProducerExample {
             });
     private final AtomicInteger producerIndex = new AtomicInteger(0);
 
-    private int keyCount = 0;
-    private int sentCount = 0;
 
-    public MAMessageProducerExample(String masterHostAndPort) throws Exception {
-        this.filters.add("aaa");
-        this.filters.add("bbb");
 
+    public MAMessageProducerExample(String masterHostAndPort) throws Exception {
         TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
         for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
             this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
@@ -90,40 +90,51 @@ public class MAMessageProducerExample {
     }
 
     public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
-
+        // get call parameters
+        final String masterServers = args[0];
         final String topics = args[1];
-        final List<String> topicList = Arrays.asList(topics.split(","));
-
-        topicSet = new TreeSet<>(topicList);
-
         msgCount = Integer.parseInt(args[2]);
-        producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-
-        logger.info("MAMessageProducerExample.main started...");
-
-        final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
+        clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
+        topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
+        // initial topic send round
+        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
+            } else {
+                for (String filter : entry.getValue()) {
+                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
+                }
+            }
+        }
+        // build message's body content
+        final byte[] transmitData =
+                StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
         final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-
         while (dataBuffer.hasRemaining()) {
             int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
+            dataBuffer.put(transmitData, offset,
+                    Math.min(dataBuffer.remaining(), transmitData.length));
         }
-
         dataBuffer.flip();
         sendData = dataBuffer.array();
+        // print started log
+        logger.info("MAMessageProducerExample.main started...");
 
         try {
-            MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
-
+            // initial producer objects
+            MAMessageProducerExample messageProducer =
+                    new MAMessageProducerExample(masterServers);
             messageProducer.startService();
-
-            while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
-                Thread.sleep(1000);
+            // wait util sent message's count reachs required count
+            while (TOTAL_COUNTER.get() < msgCount * clientCount) {
+                logger.info("Sending, total messages is {}, filter messages is {}",
+                        SENT_SUCC_COUNTER.get(), filterMsgCount.get());
+                Thread.sleep(5000);
             }
+            logger.info("Finished, total messages is {}, filter messages is {}",
+                    SENT_SUCC_COUNTER.get(), filterMsgCount.get());
             messageProducer.producerMap.clear();
             messageProducer.shutdown();
-
         } catch (TubeClientException e) {
             logger.error("TubeClientException: ", e);
         } catch (Throwable e) {
@@ -137,7 +148,7 @@ public class MAMessageProducerExample {
     }
 
     private void startService() throws TubeClientException {
-        for (int i = 0; i < producerCount; i++) {
+        for (int i = 0; i < clientCount; i++) {
             PRODUCER_LIST.add(createProducer());
         }
 
@@ -169,44 +180,44 @@ public class MAMessageProducerExample {
         public void run() {
             SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
             try {
-                producer.publish(topicSet);
+                producer.publish(topicAndFiltersMap.keySet());
             } catch (Throwable t) {
                 logger.error("publish exception: ", t);
             }
-            for (int i = 0; i < msgCount; i++) {
+            long sentCount = 0;
+            int roundIndex = 0;
+            int targetCnt = topicSendRounds.size();
+            while (msgCount < 0 || sentCount < msgCount) {
                 long millis = System.currentTimeMillis();
-                for (String topic : topicSet) {
-                    try {
-                        Message message = new Message(topic, sendData);
-                        message.setAttrKeyVal("index", String.valueOf(1));
-                        message.setAttrKeyVal("dataTime", String.valueOf(millis));
-
-                        String keyCode = arrayKey[sentCount++ % arrayKey.length];
-
-                        // date format is accurate to minute, not to second
-                        message.putSystemHeader(keyCode, sdf.format(new Date(millis)));
-                        if (filters.contains(keyCode)) {
-                            keyCount++;
-                        }
-
-                        // next line sends message synchronously, which is not recommended
-                        //producer.sendMessage(message);
-
-                        // send message asynchronously, recommended
-                        producer.sendMessage(message, new DefaultSendCallback());
-                    } catch (Throwable e1) {
-                        logger.error("sendMessage exception: ", e1);
-                    }
-
-                    if (i % 5000 == 0) {
-                        ThreadUtils.sleep(3000);
-                    } else if (i % 4000 == 0) {
-                        ThreadUtils.sleep(2000);
-                    } else if (i % 2000 == 0) {
-                        ThreadUtils.sleep(800);
-                    } else if (i % 1000 == 0) {
-                        ThreadUtils.sleep(400);
-                    }
+                roundIndex = (int) (sentCount++ % targetCnt);
+                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+                Message message = new Message(target.f0, sendData);
+                message.setAttrKeyVal("index", String.valueOf(sentCount));
+                message.setAttrKeyVal("dataTime", String.valueOf(millis));
+                if (target.f1 != null) {
+                    filterMsgCount.incrementAndGet();
+                    message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
+                }
+                try {
+                    // next line sends message synchronously, which is not recommended
+                    //producer.sendMessage(message);
+                    // send message asynchronously, recommended
+                    producer.sendMessage(message, new DefaultSendCallback());
+                } catch (Throwable e1) {
+                    TOTAL_COUNTER.incrementAndGet();
+                    SENT_EXCEPT_COUNTER.incrementAndGet();
+                    logger.error("sendMessage exception: ", e1);
+                }
+                TOTAL_COUNTER.incrementAndGet();
+                // only for test, delay inflight message's count
+                if (sentCount % 5000 == 0) {
+                    ThreadUtils.sleep(3000);
+                } else if (sentCount % 4000 == 0) {
+                    ThreadUtils.sleep(2000);
+                } else if (sentCount % 2000 == 0) {
+                    ThreadUtils.sleep(800);
+                } else if (sentCount % 1000 == 0) {
+                    ThreadUtils.sleep(400);
                 }
             }
             try {
@@ -221,19 +232,18 @@ public class MAMessageProducerExample {
     private class DefaultSendCallback implements MessageSentCallback {
         @Override
         public void onMessageSent(MessageSentResult result) {
+            TOTAL_COUNTER.incrementAndGet();
             if (result.isSuccess()) {
-                if (SENT_SUCC_COUNTER.incrementAndGet() % 1000 == 0) {
-                    logger.info("Send {} message, keyCount is {}", SENT_SUCC_COUNTER.get(), keyCount);
-                }
+                SENT_SUCC_COUNTER.incrementAndGet();
             } else {
-                if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
-                    logger.error("Send message failed!" + result.getErrMsg());
-                }
+                SENT_FAIL_COUNTER.incrementAndGet();
             }
         }
 
         @Override
         public void onException(Throwable e) {
+            TOTAL_COUNTER.incrementAndGet();
+            SENT_EXCEPT_COUNTER.incrementAndGet();
             logger.error("Send message error!", e);
         }
     }
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index 0252d16..866be77 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -17,8 +17,6 @@
 
 package org.apache.tubemq.example;
 
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
@@ -35,6 +33,7 @@ import org.apache.tubemq.client.exception.TubeClientException;
 import org.apache.tubemq.client.factory.MessageSessionFactory;
 import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,59 +54,48 @@ import org.slf4j.LoggerFactory;
  */
 public final class MessageConsumerExample {
 
-    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerExample.class);
+    private static final Logger logger =
+            LoggerFactory.getLogger(MessageConsumerExample.class);
     private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
 
     private final PushMessageConsumer messageConsumer;
     private final MessageSessionFactory messageSessionFactory;
 
-    public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+    public MessageConsumerExample(String masterHostAndPort,
+                                  String group, int fetchThreadCnt) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        if (fetchCount > 0) {
-            consumerConfig.setPushFetchThreadCnt(fetchCount);
+        if (fetchThreadCnt > 0) {
+            consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
         }
         this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
     }
 
+
     public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
+        final String masterServers = args[0];
         final String topics = args[1];
         final String group = args[2];
-        final int consumerCount = Integer.parseInt(args[3]);
-        int fetchCount = -1;
+        final int clientCount = Integer.parseInt(args[3]);
+        int threadCnt = -1;
         if (args.length > 5) {
-            fetchCount = Integer.parseInt(args[4]);
-        }
-        final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
-
-        String[] topicTidsList = topics.split(",");
-        for (String topicTids : topicTidsList) {
-            String[] topicTidStr = topicTids.split(":");
-            TreeSet<String> tids = null;
-            if (topicTidStr.length > 1) {
-                String tidsStr = topicTidStr[1];
-                String[] tidsSet = tidsStr.split(";");
-                if (tidsSet.length > 0) {
-                    tids = new TreeSet<>(Arrays.asList(tidsSet));
-                }
-            }
-            topicTidsMap.put(topicTidStr[0], tids);
+            threadCnt = Integer.parseInt(args[4]);
         }
-        final int startFetchCount = fetchCount;
-        final ExecutorService executorService = Executors.newCachedThreadPool();
-        for (int i = 0; i < consumerCount; i++) {
+        final int fetchThreadCnt = threadCnt;
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(topics);
+        final ExecutorService executorService =
+                Executors.newCachedThreadPool();
+        for (int i = 0; i < clientCount; i++) {
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        MessageConsumerExample messageConsumer = new MessageConsumerExample(
-                                masterHostAndPort,
-                                group,
-                                startFetchCount
-                        );
-                        messageConsumer.subscribe(topicTidsMap);
+                        MessageConsumerExample messageConsumer =
+                                new MessageConsumerExample(masterServers,
+                                        group, fetchThreadCnt);
+                        messageConsumer.subscribe(topicAndFiltersMap);
                     } catch (Exception e) {
                         logger.error("Create consumer failed!", e);
                     }
@@ -126,8 +114,8 @@ public final class MessageConsumerExample {
         msgRecvStats.stopStats();
     }
 
-    public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
-        for (Map.Entry<String, TreeSet<String>> entry : topicTidsMap.entrySet()) {
+    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
             MessageV2Listener messageListener = new DefaultMessageListener(entry.getKey());
             messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
         }
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
index 57d0ef0..7c29a63 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
@@ -19,7 +19,7 @@ package org.apache.tubemq.example;
 
 import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +36,9 @@ import org.apache.tubemq.client.producer.MessageProducer;
 import org.apache.tubemq.client.producer.MessageSentCallback;
 import org.apache.tubemq.client.producer.MessageSentResult;
 import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,32 +50,40 @@ import org.slf4j.LoggerFactory;
  */
 public final class MessageProducerExample {
 
-    private static final Logger logger = LoggerFactory.getLogger(MessageProducerExample.class);
-    private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+    private static final Logger logger =
+            LoggerFactory.getLogger(MessageProducerExample.class);
+    private static final ConcurrentHashMap<String, AtomicLong> counterMap =
+            new ConcurrentHashMap<>();
 
-    private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"};
-    private final Set<String> filters = new TreeSet<>();
     private final MessageProducer messageProducer;
     private final MessageSessionFactory messageSessionFactory;
 
-    private int keyCount = 0;
-    private int sentCount = 0;
-
-    public MessageProducerExample(String masterHostAndPort) throws Exception {
-        filters.add("aaa");
-        filters.add("bbb");
-
-        TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
+    public MessageProducerExample(String masterServers) throws Exception {
+        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
         this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
         this.messageProducer = messageSessionFactory.createProducer();
     }
 
     public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
+        // get and initial parameters
+        final String masterServers = args[0];
         final String topics = args[1];
-        final List<String> topicList = Arrays.asList(topics.split(","));
-        final int count = Integer.parseInt(args[2]);
-
+        final long msgCount = Long.parseLong(args[2]);
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(topics);
+        // initial send target
+        final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+        // initial topic send round
+        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
+            } else {
+                for (String filter : entry.getValue()) {
+                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
+                }
+            }
+        }
+        // initial sent data
         String body = "This is a test message from single-session-factory.";
         byte[] bodyBytes = StringUtils.getBytesUtf8(body);
         final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
@@ -82,32 +92,40 @@ public final class MessageProducerExample {
             dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
         }
         dataBuffer.flip();
+        // send messages
         try {
-            MessageProducerExample messageProducer = new MessageProducerExample(masterHostAndPort);
-            messageProducer.publishTopics(topicList);
-            for (int i = 0; i < count; i++) {
-                for (String topic : topicList) {
-                    try {
-                        // next line sends message synchronously, which is not recommended
-                        // messageProducer.sendMessage(topic, body.getBytes());
-
-                        // send message asynchronously, recommended
-                        messageProducer.sendMessageAsync(
-                            i,
-                            topic,
-                            dataBuffer.array(),
-                            messageProducer.new DefaultSendCallback()
-                        );
-                    } catch (Throwable e1) {
-                        logger.error("Send Message throw exception  ", e1);
-                    }
+            long sentCount = 0;
+            int roundIndex = 0;
+            int targetCnt = topicSendRounds.size();
+            MessageProducerExample messageProducer =
+                    new MessageProducerExample(masterServers);
+            messageProducer.publishTopics(topicAndFiltersMap.keySet());
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+            while (msgCount < 0 || sentCount < msgCount) {
+                roundIndex = (int) (sentCount++ % targetCnt);
+                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+                Message message = new Message(target.f0, body.getBytes());
+                long currTimeMillis = System.currentTimeMillis();
+                message.setAttrKeyVal("index", String.valueOf(sentCount));
+                message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+                if (target.f1 != null) {
+                    message.putSystemHeader(target.f1, sdf.format(new Date(currTimeMillis)));
                 }
-
-                if (i % 20000 == 0) {
+                try {
+                    // 1.1 next line sends message synchronously, which is not recommended
+                    // messageProducer.sendMessage(message);
+                    // 1.2 send message asynchronously, recommended
+                    messageProducer.sendMessageAsync(message,
+                            messageProducer.new DefaultSendCallback());
+                } catch (Throwable e1) {
+                    logger.error("Send Message throw exception  ", e1);
+                }
+                // only for test, delay inflight message's count
+                if (sentCount % 20000 == 0) {
                     ThreadUtils.sleep(4000);
-                } else if (i % 10000 == 0) {
+                } else if (sentCount % 10000 == 0) {
                     ThreadUtils.sleep(2000);
-                } else if (i % 2500 == 0) {
+                } else if (sentCount % 2500 == 0) {
                     ThreadUtils.sleep(300);
                 }
             }
@@ -124,51 +142,33 @@ public final class MessageProducerExample {
         }
     }
 
-    public void publishTopics(List<String> topicList) throws TubeClientException {
-        this.messageProducer.publish(new TreeSet<>(topicList));
+    public void publishTopics(Set<String> topicSet) throws TubeClientException {
+        this.messageProducer.publish(topicSet);
     }
 
     /**
      * Send message synchronous.
      */
-    public void sendMessage(String topic, byte[] body) {
+    public void sendMessage(Message message) {
         // date format is accurate to minute, not to second
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-        long currTimeMillis = System.currentTimeMillis();
-        Message message = new Message(topic, body);
-        message.setAttrKeyVal("index", String.valueOf(1));
-        message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
-        message.putSystemHeader("test", sdf.format(new Date(currTimeMillis)));
         try {
             MessageSentResult result = messageProducer.sendMessage(message);
             if (!result.isSuccess()) {
-                logger.error("Send message failed!" + result.getErrMsg());
+                logger.error("Sync-send message failed!" + result.getErrMsg());
             }
         } catch (TubeClientException | InterruptedException e) {
-            logger.error("Send message failed!", e);
+            logger.error("Sync-send message failed!", e);
         }
     }
 
     /**
      * Send message asynchronous. More efficient and recommended.
      */
-    public void sendMessageAsync(int id, String topic, byte[] body, MessageSentCallback callback) {
-        Message message = new Message(topic, body);
-
-        // date format is accurate to minute, not to second
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-        long currTimeMillis = System.currentTimeMillis();
-        message.setAttrKeyVal("index", String.valueOf(1));
-        String keyCode = arrayKey[sentCount++ % arrayKey.length];
-        message.putSystemHeader(keyCode, sdf.format(new Date(currTimeMillis)));
-        if (filters.contains(keyCode)) {
-            keyCount++;
-        }
+    public void sendMessageAsync(Message message, MessageSentCallback callback) {
         try {
-            message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
             messageProducer.sendMessage(message, callback);
         } catch (TubeClientException | InterruptedException e) {
-            logger.error("Send message failed!", e);
+            logger.error("Async-send message failed!", e);
         }
     }
 
@@ -187,9 +187,8 @@ public final class MessageProducerExample {
                         currCount = tmpCount;
                     }
                 }
-
                 if (currCount.incrementAndGet() % 1000 == 0) {
-                    logger.info("Send " + topicName + " " + currCount.get() + " message, keyCount is " + keyCount);
+                    logger.info("Send " + topicName + " " + currCount.get() + " message!");
                 }
             } else {
                 logger.error("Send message failed!" + result.getErrMsg());
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index 16c45b2..36fcaa2 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -17,9 +17,10 @@
 
 package org.apache.tubemq.example;
 
-import java.util.Arrays;
+import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import org.apache.tubemq.client.config.ConsumerConfig;
 import org.apache.tubemq.client.consumer.ConsumeOffsetInfo;
 import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -29,9 +30,11 @@ import org.apache.tubemq.client.exception.TubeClientException;
 import org.apache.tubemq.client.factory.MessageSessionFactory;
 import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This demo shows how to consume message by pull.
  *
@@ -42,7 +45,8 @@ import org.slf4j.LoggerFactory;
  */
 public final class MessagePullConsumerExample {
 
-    private static final Logger logger = LoggerFactory.getLogger(MessagePullConsumerExample.class);
+    private static final Logger logger =
+            LoggerFactory.getLogger(MessagePullConsumerExample.class);
     private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
 
     private final PullMessageConsumer messagePullConsumer;
@@ -56,39 +60,37 @@ public final class MessagePullConsumerExample {
     }
 
     public static void main(String[] args) throws Throwable {
-        final String masterHostAndPort = args[0];
+        // get and initial parameters
+        final String masterServers = args[0];
         final String topics = args[1];
         final String group = args[2];
-        final int consumeCount = Integer.parseInt(args[3]);
-
-        final MessagePullConsumerExample messageConsumer = new MessagePullConsumerExample(
-            masterHostAndPort,
-            group
-        );
-
-        final List<String> topicList = Arrays.asList(topics.split(","));
-        messageConsumer.subscribe(topicList);
-        long startTime = System.currentTimeMillis();
-
+        final int msgCount = Integer.parseInt(args[3]);
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(topics);
+        // initial consumer object
+        final MessagePullConsumerExample messageConsumer =
+                new MessagePullConsumerExample(masterServers, group);
+        messageConsumer.subscribe(topicAndFiltersMap);
         Thread[] fetchRunners = new Thread[3];
         for (int i = 0; i < fetchRunners.length; i++) {
-            fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, consumeCount));
+            fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
             fetchRunners[i].setName("_fetch_runner_" + i);
         }
-
+        // initial fetch threads
         for (Thread thread : fetchRunners) {
             thread.start();
         }
-
-        Thread statisticThread = new Thread(msgRecvStats, "Sent Statistic Thread");
+        // initial statistic thread
+        Thread statisticThread =
+                new Thread(msgRecvStats, "Sent Statistic Thread");
         statisticThread.start();
     }
 
-    public void subscribe(List<String> topicList) throws TubeClientException {
-        for (String topic : topicList) {
-            messagePullConsumer.subscribe(topic, null);
+    public void subscribe(
+            Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
         }
-
         messagePullConsumer.completeSubscribe();
     }
 
@@ -96,7 +98,8 @@ public final class MessagePullConsumerExample {
         return messagePullConsumer.getMessage();
     }
 
-    public ConsumerResult confirmConsume(final String confirmContext, boolean isConsumed) throws TubeClientException {
+    public ConsumerResult confirmConsume(final String confirmContext,
+                                         boolean isConsumed) throws TubeClientException {
         return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
     }
 
@@ -109,9 +112,9 @@ public final class MessagePullConsumerExample {
         final MessagePullConsumerExample messageConsumer;
         final int consumeCount;
 
-        FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int count) {
+        FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
             this.messageConsumer = messageConsumer;
-            this.consumeCount = count;
+            this.consumeCount = msgCount;
         }
 
         @Override
@@ -127,16 +130,10 @@ public final class MessagePullConsumerExample {
                         }
                         messageConsumer.confirmConsume(result.getConfirmContext(), true);
                     } else {
-                        if (!(result.getErrCode() == 400
-                                || result.getErrCode() == 404
-                                || result.getErrCode() == 405
-                                || result.getErrCode() == 406
-                                || result.getErrCode() == 407
-                                || result.getErrCode() == 408)) {
+                        if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
                             logger.info(
                                     "Receive messages errorCode is {}, Error message is {}",
-                                    result.getErrCode(),
-                                    result.getErrMsg());
+                                    result.getErrCode(), result.getErrMsg());
                         }
                     }
                     if (consumeCount > 0) {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
index d3afeaa..3d49478 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,7 +17,7 @@
 
 package org.apache.tubemq.example;
 
-import java.util.Arrays;
+import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
@@ -34,9 +34,11 @@ import org.apache.tubemq.client.exception.TubeClientException;
 import org.apache.tubemq.client.factory.MessageSessionFactory;
 import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
  * is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
@@ -45,24 +47,32 @@ import org.slf4j.LoggerFactory;
  */
 public final class MessagePullSetConsumerExample {
 
-    private static final Logger logger = LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
+    private static final Logger logger =
+            LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
     private static final AtomicLong counter = new AtomicLong(0);
 
     private final PullMessageConsumer messagePullConsumer;
     private final MessageSessionFactory messageSessionFactory;
 
-    public MessagePullSetConsumerExample(String masterHostAndPort, String group) throws Exception {
+    public MessagePullSetConsumerExample(String masterHostAndPort,
+                                         String group) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
     }
 
     public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
+        // get and initial parameters
+        final String masterServers = args[0];
         final String topics = args[1];
         final String group = args[2];
-        final int consumeCount = Integer.parseInt(args[3]);
-        final Map<String, Long> partOffsetMap = new ConcurrentHashMap<>();
+        final int msgCount = Integer.parseInt(args[3]);
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(topics);
+        // initial reset offset parameters
+        // (The offset specified is only a demo)
+        final Map<String, Long> partOffsetMap =
+                new ConcurrentHashMap<>();
         partOffsetMap.put("123:test_1:0", 0L);
         partOffsetMap.put("123:test_1:1", 0L);
         partOffsetMap.put("123:test_1:2", 0L);
@@ -70,17 +80,15 @@ public final class MessagePullSetConsumerExample {
         partOffsetMap.put("123:test_2:1", 350L);
         partOffsetMap.put("123:test_2:2", 350L);
 
-        final List<String> topicList = Arrays.asList(topics.split(","));
-
         ExecutorService executorService = Executors.newCachedThreadPool();
         executorService.submit(new Runnable() {
             @Override
             public void run() {
                 try {
-                    int getCount = consumeCount;
+                    int getCount = msgCount;
                     MessagePullSetConsumerExample messageConsumer =
-                        new MessagePullSetConsumerExample(masterHostAndPort, group);
-                    messageConsumer.subscribe(topicList, partOffsetMap);
+                        new MessagePullSetConsumerExample(masterServers, group);
+                    messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
                     // main logic of consuming
                     do {
                         ConsumerResult result = messageConsumer.getMessage();
@@ -125,19 +133,13 @@ public final class MessagePullSetConsumerExample {
                                     confirmResult.getErrMsg());
                             }
                         } else {
-                            if (!(result.getErrCode() == 400
-                                    || result.getErrCode() == 404
-                                    || result.getErrCode() == 405
-                                    || result.getErrCode() == 406
-                                    || result.getErrCode() == 407
-                                    || result.getErrCode() == 408)) {
+                            if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
                                 logger.info(
                                         "Receive messages errorCode is {}, Error message is {}",
-                                        result.getErrCode(),
-                                        result.getErrMsg());
+                                        result.getErrCode(), result.getErrMsg());
                             }
                         }
-                        if (consumeCount >= 0) {
+                        if (msgCount >= 0) {
                             if (--getCount <= 0) {
                                 break;
                             }
@@ -157,24 +159,16 @@ public final class MessagePullSetConsumerExample {
         }
     }
 
-    public void subscribe(
-        List<String> topicList,
-        Map<String, Long> partOffsetMap
-    ) throws TubeClientException {
-        TreeSet<String> filters = new TreeSet<>();
-        filters.add("aaa");
-        filters.add("bbb");
-        for (String topic : topicList) {
-            this.messagePullConsumer.subscribe(topic, filters);
+    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
+                          Map<String, Long> partOffsetMap) throws TubeClientException {
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
         }
         String sessionKey = "test_reset2";
         int consumerCount = 2;
         boolean isSelectBig = false;
-        messagePullConsumer.completeSubscribe(
-            sessionKey,
-            consumerCount,
-            isSelectBig,
-            partOffsetMap);
+        messagePullConsumer.completeSubscribe(sessionKey,
+                consumerCount, isSelectBig, partOffsetMap);
     }
 
     public ConsumerResult getMessage() throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
index 1b2f25a..f544829 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -42,6 +42,7 @@ import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.fielddef.CliArgDef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ public class CliProducer extends CliAbstractBase {
     // sent data content
     private static byte[] sentData;
     private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
-    private final List<TupleValue> topicSendRounds = new ArrayList<>();
+    private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
     private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
     private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
     // cli parameters
@@ -189,10 +190,10 @@ public class CliProducer extends CliAbstractBase {
         // initial topic send round
         for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
             if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new TupleValue(entry.getKey()));
+                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
             } else {
                 for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new TupleValue(entry.getKey(), filter));
+                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
                 }
             }
         }
@@ -266,11 +267,11 @@ public class CliProducer extends CliAbstractBase {
                 roundIndex = (int) (sentCount++ % topicAndCondCnt);
                 try {
                     long millis = System.currentTimeMillis();
-                    TupleValue tupleValue = topicSendRounds.get(roundIndex);
-                    Message message = new Message(tupleValue.topic, sentData);
-                    if (tupleValue.filter != null) {
+                    Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+                    Message message = new Message(target.f0, sentData);
+                    if (target.f1 != null) {
                         // if include filter, add filter item
-                        message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis)));
+                        message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
                     }
                     // use sync or async process
                     if (syncProduction) {
@@ -332,21 +333,6 @@ public class CliProducer extends CliAbstractBase {
         }
     }
 
-    private static class TupleValue {
-        public String topic = null;
-        public String filter = null;
-
-        public TupleValue(String topic) {
-            this.topic = topic;
-        }
-
-        public TupleValue(String topic, String filter) {
-            this.topic = topic;
-            this.filter = filter;
-        }
-
-    }
-
     public static void main(String[] args) {
         CliProducer cliProducer = new CliProducer();
         try {