You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 05:03:38 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1760] Optimize the realization of class MessageProducerExample (#1762)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new 61611cd  [INLONG-1760] Optimize the realization of class MessageProducerExample (#1762)
61611cd is described below

commit 61611cd4d8572c7b675363448e6c3defd9aa6db0
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 13:03:30 2021 +0800

    [INLONG-1760] Optimize the realization of class MessageProducerExample (#1762)
---
 .../inlong/tubemq/corebase/utils/MixedUtils.java   |  32 ++++
 .../tubemq/example/MessageProducerExample.java     | 208 ++++++++-------------
 .../tubemq/example/MessagePullConsumerExample.java |   8 +-
 .../example/MessagePullSetConsumerExample.java     |   9 +-
 .../tubemq/example/MessagePushConsumerExample.java |  11 +-
 ...{MsgRecvStats.java => MsgSendReceiveStats.java} |  38 ++--
 6 files changed, 154 insertions(+), 152 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index eb432d5..18c4ca1 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.tubemq.corebase.utils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
@@ -78,6 +80,24 @@ public class MixedUtils {
         return topicAndFiltersMap;
     }
 
+    // build the topic and filter item pair carried in the message
+    public static List<Tuple2<String, String>> buildTopicFilterTupleList(
+            Map<String, TreeSet<String>> topicAndFiltersMap) {
+        // initial send target
+        List<Tuple2<String, String>> topicFilterTuples = new ArrayList<>();
+        // initial topic send round
+        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                topicFilterTuples.add(new Tuple2<>(entry.getKey()));
+            } else {
+                for (String filter : entry.getValue()) {
+                    topicFilterTuples.add(new Tuple2<>(entry.getKey(), filter));
+                }
+            }
+        }
+        return topicFilterTuples;
+    }
+
     public static byte[] buildTestData(int bodySize) {
         final byte[] transmitData =
                 StringUtils.getBytesUtf8("This is a test data!");
@@ -91,6 +111,18 @@ public class MixedUtils {
         return dataBuffer.array();
     }
 
+    public static void coolSending(long msgSentCount) {
+        if (msgSentCount % 5000 == 0) {
+            ThreadUtils.sleep(3000);
+        } else if (msgSentCount % 4000 == 0) {
+            ThreadUtils.sleep(2000);
+        } else if (msgSentCount % 2000 == 0) {
+            ThreadUtils.sleep(800);
+        } else if (msgSentCount % 1000 == 0) {
+            ThreadUtils.sleep(400);
+        }
+    }
+
     // get the middle data between min, max, and data
     public static int mid(int data, int min, int max) {
         return Math.max(min, Math.min(max, data));
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index 18462c7..bb4de3c 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -17,17 +17,12 @@
 
 package org.apache.inlong.tubemq.example;
 
-import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -37,7 +32,6 @@ import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
 import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,144 +46,104 @@ public final class MessageProducerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessageProducerExample.class);
-    private static final ConcurrentHashMap<String, AtomicLong> counterMap =
-            new ConcurrentHashMap<>();
+    private static final MsgSendReceiveStats msgSendStats =
+            new MsgSendReceiveStats(true);
 
-    private final MessageProducer messageProducer;
-    private final MessageSessionFactory messageSessionFactory;
+    private static MessageSessionFactory sessionFactory;
+    private static MessageProducer messageProducer;
 
-    public MessageProducerExample(String masterServers) throws Exception {
-        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
-        this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
-        this.messageProducer = messageSessionFactory.createProducer();
-    }
-
-    public static void main(String[] args) {
-        // get and initial parameters
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to publish to.
+    // The 3rd parameter msgCount is the message amount that needs to be sent
+    // The 4th parameter pkgSize is the message's body size that needs to be sent
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
+        final String pubTopicAndFilterItems = args[1];
         final long msgCount = Long.parseLong(args[2]);
-        final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial send target
-        final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
-        // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
+        int pkgSize = 1024;
+        if (args.length > 3) {
+            pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
         }
-        // initial sent data
-        String body = "This is a test message from single-session-factory.";
-        byte[] bodyBytes = StringUtils.getBytesUtf8(body);
-        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-        while (dataBuffer.hasRemaining()) {
-            int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
-        }
-        dataBuffer.flip();
-        // send messages
-        try {
-            long sentCount = 0;
-            int roundIndex = 0;
-            int targetCnt = topicSendRounds.size();
-            MessageProducerExample messageProducer =
-                    new MessageProducerExample(masterServers);
-            messageProducer.publishTopics(topicAndFiltersMap.keySet());
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-            while (msgCount < 0 || sentCount < msgCount) {
-                roundIndex = (int) (sentCount++ % targetCnt);
-                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                Message message = new Message(target.getF0(), dataBuffer.array());
-                long currTimeMillis = System.currentTimeMillis();
-                message.setAttrKeyVal("index", String.valueOf(sentCount));
-                message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
-                if (target.getF1() != null) {
-                    message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
-                }
-                try {
-                    // 1.1 next line sends message synchronously, which is not recommended
-                    // messageProducer.sendMessage(message);
-                    // 1.2 send message asynchronously, recommended
-                    messageProducer.sendMessageAsync(message,
-                            messageProducer.new DefaultSendCallback());
-                } catch (Throwable e1) {
-                    logger.error("Send Message throw exception  ", e1);
-                }
-                // only for test, delay inflight message's count
-                if (sentCount % 20000 == 0) {
-                    ThreadUtils.sleep(4000);
-                } else if (sentCount % 10000 == 0) {
-                    ThreadUtils.sleep(2000);
-                } else if (sentCount % 2500 == 0) {
-                    ThreadUtils.sleep(300);
-                }
-            }
-            ThreadUtils.sleep(20000);
-            for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
-                logger.info(
-                    "********* Current {} Message sent count is {}",
-                    entry.getKey(),
-                    entry.getValue().get()
-                );
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+        // 2. initial configure, session factory object, and producer object
+        TubeClientConfig clientConfig =
+                new TubeClientConfig(masterServers);
+        sessionFactory = new TubeSingleSessionFactory(clientConfig);
+        messageProducer = sessionFactory.createProducer();
+        messageProducer.publish(topicAndFiltersMap.keySet());
+
+        // 3. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgSendStats, "Sent Statistic Thread");
+        statisticThread.start();
+
+        // 4. build the content of the message to be sent
+        //    include message body, attributes, and time information template
+        final byte[] dataBuffer =
+                MixedUtils.buildTestData(pkgSize);
+        List<Tuple2<String, String>> buildTopicFilterTuples =
+                MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+
+        // 5. send message to server
+        long sentCount = 0;
+        int roundIndex = 0;
+        int targetCnt = buildTopicFilterTuples.size();
+        while (msgCount < 0 || sentCount < msgCount) {
+            // 5.1 Rotate to get the attribute information to be sent
+            roundIndex = (int) (sentCount++ % targetCnt);
+            Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
+
+            // 5.2 build message to be sent
+            Message message = new Message(target.getF0(), dataBuffer);
+            long currTimeMillis = System.currentTimeMillis();
+            message.setAttrKeyVal("index", String.valueOf(sentCount));
+            message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+            if (target.getF1() != null) {
+                // 5.2.1 add filter attribute information
+                message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
             }
-        } catch (Throwable e) {
-            e.printStackTrace();
-        }
-    }
 
-    public void publishTopics(Set<String> topicSet) throws TubeClientException {
-        this.messageProducer.publish(topicSet);
-    }
+            // 5.3 send message
+            try {
+                // 5.3.1 Send data asynchronously, recommended
+                messageProducer.sendMessage(message, new DefaultSendCallback());
 
-    /**
-     * Send message synchronous.
-     */
-    public void sendMessage(Message message) {
-        // date format is accurate to minute, not to second
-        try {
-            MessageSentResult result = messageProducer.sendMessage(message);
-            if (!result.isSuccess()) {
-                logger.error("Sync-send message failed!" + result.getErrMsg());
+                // 5.3.2 Send message synchronous, not recommended
+                // MessageSentResult result = messageProducer.sendMessage(message);
+                // if (!result.isSuccess()) {
+                //    logger.error("Sync-send message failed!" + result.getErrMsg());
+                // }
+            } catch (TubeClientException | InterruptedException e) {
+                logger.error("Async-send message failed!", e);
             }
-        } catch (TubeClientException | InterruptedException e) {
-            logger.error("Sync-send message failed!", e);
-        }
-    }
 
-    /**
-     * Send message asynchronous. More efficient and recommended.
-     */
-    public void sendMessageAsync(Message message, MessageSentCallback callback) {
-        try {
-            messageProducer.sendMessage(message, callback);
-        } catch (TubeClientException | InterruptedException e) {
-            logger.error("Async-send message failed!", e);
+            // 5.4 Cool sending
+            //     Attention: only used in the test link, to solve the problem of
+            //                frequent sending failures caused by insufficient test resources.
+            MixedUtils.coolSending(sentCount);
         }
+
+        // 6. Clean up resources and exit the service after the task is completed
+        //    Attention: TubeMQ client is suitable for serving as a resident service,
+        //               not suitable for creating Producer objects message by message
+        messageProducer.shutdown();
+        sessionFactory.shutdown();
+        msgSendStats.stopStats();
+        logger.info("The message sending task has been completed!");
     }
 
-    private class DefaultSendCallback implements MessageSentCallback {
+    private static class DefaultSendCallback implements MessageSentCallback {
 
         @Override
         public void onMessageSent(MessageSentResult result) {
             if (result.isSuccess()) {
-                String topicName = result.getMessage().getTopic();
-
-                AtomicLong currCount = counterMap.get(topicName);
-                if (currCount == null) {
-                    AtomicLong tmpCount = new AtomicLong(0);
-                    currCount = counterMap.putIfAbsent(topicName, tmpCount);
-                    if (currCount == null) {
-                        currCount = tmpCount;
-                    }
-                }
-                if (currCount.incrementAndGet() % 1000 == 0) {
-                    logger.info("Send " + topicName + " " + currCount.get() + " message!");
-                }
+                msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
             } else {
                 logger.error("Send message failed!" + result.getErrMsg());
             }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4e9674b..bf51d9e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -46,7 +46,8 @@ public final class MessagePullConsumerExample {
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullConsumerExample.class);
 
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
     private static PullMessageConsumer pullConsumer;
     private static MessageSessionFactory sessionFactory;
 
@@ -126,7 +127,7 @@ public final class MessagePullConsumerExample {
                                 // 4.2.1 process message if getMessage() return success
                                 List<Message> messageList = csmResult.getMessageList();
                                 if (messageList != null && !messageList.isEmpty()) {
-                                    msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                                    msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
                                 }
                                 // 4.2.1.1 confirm consume result
                                 // Notice:
@@ -171,7 +172,7 @@ public final class MessagePullConsumerExample {
 
         // 6. initial and statistic thread
         Thread statisticThread =
-                new Thread(msgRecvStats, "Sent Statistic Thread");
+                new Thread(msgRcvStats, "Receive Statistic Thread");
         statisticThread.start();
 
         // 7. Resource cleanup when exiting the service
@@ -183,6 +184,5 @@ public final class MessagePullConsumerExample {
         // 7.3 shutdown statistic thread
         // msgRecvStats.stopStats();
     }
-
 }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index 2654e75..2bc7080 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -51,7 +51,8 @@ public final class MessagePullSetConsumerExample {
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
     // Statistic object
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
     // The map of the master cluster and Multiple session factory
     //    There may be multiple consumers in the same process and the topic sets subscribed
     //    by different consumers are in different clusters. In this case,
@@ -82,7 +83,7 @@ public final class MessagePullSetConsumerExample {
 
         // 2. initial and statistic thread
         Thread statisticThread =
-                new Thread(msgRecvStats, "Sent Statistic Thread");
+                new Thread(msgRcvStats, "Receive Statistic Thread");
         statisticThread.start();
 
         // 3. Start the consumer group for the first consumption
@@ -129,9 +130,9 @@ public final class MessagePullSetConsumerExample {
             thread.join();
         }
 
-        logger.info("The second consumption begin!");
         // 4. Start the consumer group for the second consumption
         // 4.1 set the boostrap Offset, here we set consumption from 0
+        logger.info("The second consumption begin!");
         String sessionKeySec = "test_consume_Second";
         int sourceCountSec = 1;
         ConcurrentHashMap<String, Long> partOffsetMapSec =
@@ -229,7 +230,7 @@ public final class MessagePullSetConsumerExample {
                         // 2.1 process message if getMessage() return success
                         List<Message> messageList = csmResult.getMessageList();
                         if (messageList != null && !messageList.isEmpty()) {
-                            msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                            msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
                         }
                         // 2.1.2 store the offset of processing message
                         //       the offset returned by GetMessage() represents the initial offset of this request
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index 0db4acd..94c8d15 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -49,7 +49,8 @@ import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
  */
 public final class MessagePushConsumerExample {
 
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    private static final MsgSendReceiveStats msgRcvStats =
+            new MsgSendReceiveStats(false);
     private static MessageSessionFactory sessionFactory;
     private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
 
@@ -99,15 +100,15 @@ public final class MessagePushConsumerExample {
                 MessageListener messageListener = new DefaultMessageListener(entry.getKey());
                 consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
             }
-            // 3.3. start consumer
+            // 3.3 start consumer
             consumer.completeSubscribe();
-            // 3.4. store consumer object
+            // 3.4 store consumer object
             consumerMap.put(consumer.getConsumerId(), consumer);
         }
 
         // 4. initial and statistic thread
         Thread statisticThread =
-                new Thread(msgRecvStats, "Sent Statistic Thread");
+                new Thread(msgRcvStats, "Receive Statistic Thread");
         statisticThread.start();
 
         // 5. Resource cleanup when exiting the service
@@ -136,7 +137,7 @@ public final class MessagePushConsumerExample {
         @Override
         public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
             if (messages != null && !messages.isEmpty()) {
-                msgRecvStats.addMsgCount(this.topic, messages.size());
+                msgRcvStats.addMsgCount(this.topic, messages.size());
             }
         }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
similarity index 67%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
index f418052..cfc8a55 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
@@ -28,17 +28,20 @@ import org.slf4j.LoggerFactory;
 /**
  * This demo shows how to collect and report message received statistics.
  */
-public class MsgRecvStats implements Runnable {
-    private static final Logger logger =
-            LoggerFactory.getLogger(MsgRecvStats.class);
-    private static final ConcurrentHashMap<String, AtomicLong> counterMap =
-            new ConcurrentHashMap<>();
-    private static final ConcurrentHashMap<String, AtomicLong> befCountMap =
-            new ConcurrentHashMap<>();
-    private AtomicBoolean isStarted = new AtomicBoolean(true);
+public class MsgSendReceiveStats implements Runnable {
+    private final boolean isProducer;
+    private static final Logger logger = LoggerFactory.getLogger(MsgSendReceiveStats.class);
+    private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
+    private final AtomicBoolean isStarted = new AtomicBoolean(true);
+
+    public MsgSendReceiveStats(boolean isProducer) {
+        this.isProducer = isProducer;
+    }
 
     @Override
     public void run() {
+        // Indicator output every 30 seconds
         while (isStarted.get()) {
             try {
                 for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
@@ -51,10 +54,13 @@ public class MsgRecvStats implements Runnable {
                             befCount = tmpCount;
                         }
                     }
-                    // output received statistic information
-                    logger.info("********* Current {} Message receive count is {}, dlt is {}",
-                        new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
-                    // archive historical statistic data
+                    if (isProducer) {
+                        logger.info("********* Current {} Message sent count is {}, dlt is {}",
+                                new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+                    } else {
+                        logger.info("********* Current {} Message received count is {}, dlt is {}",
+                                new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+                    }
                     befCountMap.get(entry.getKey()).set(currCount);
                 }
             } catch (Throwable t) {
@@ -74,6 +80,14 @@ public class MsgRecvStats implements Runnable {
                     currCount = tmpCount;
                 }
             }
+            // Indicator output every 1000
+            if (currCount.addAndGet(msgCnt) % 1000 == 0) {
+                if (isProducer) {
+                    logger.info("Sent " + topicName + " messages:" + currCount.get());
+                } else {
+                    logger.info("Received " + topicName + " messages:" + currCount.get());
+                }
+            }
         }
     }