You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 07:37:18 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1761] Optimize the realization of class MAMessageProducerExample (#1763)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new 76e0974  [INLONG-1761] Optimize the realization of class MAMessageProducerExample (#1763)
76e0974 is described below

commit 76e097482193ab82dd282883eb94499b36d61586
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 15:37:14 2021 +0800

    [INLONG-1761] Optimize the realization of class MAMessageProducerExample (#1763)
---
 .../inlong/tubemq/corebase/utils/MixedUtils.java   |  23 ++
 .../tubemq/example/MAMessageProducerExample.java   | 283 ++++++++++-----------
 .../tubemq/example/MessageProducerExample.java     |  29 +--
 3 files changed, 164 insertions(+), 171 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index 18c4ca1..e4629b2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,13 +18,16 @@
 package org.apache.inlong.tubemq.corebase.utils;
 
 import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
 
 public class MixedUtils {
@@ -98,6 +101,7 @@ public class MixedUtils {
         return topicFilterTuples;
     }
 
+    // only for demo
     public static byte[] buildTestData(int bodySize) {
         final byte[] transmitData =
                 StringUtils.getBytesUtf8("This is a test data!");
@@ -111,6 +115,25 @@ public class MixedUtils {
         return dataBuffer.array();
     }
 
+    // build message to be sent
+    // only for demo
+    public static Message buildMessage(String topicName, String filterItem,
+                                       byte[] bodyData, long serialId,
+                                       SimpleDateFormat sdf) {
+        // build message to be sent
+        Message message = new Message(topicName, bodyData);
+        long currTimeMillis = System.currentTimeMillis();
+        // added a serial number and data generation time to each message
+        message.setAttrKeyVal("serialId", String.valueOf(serialId));
+        message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+        if (filterItem != null) {
+            // add filter attribute information
+            message.putSystemHeader(filterItem, sdf.format(new Date(currTimeMillis)));
+        }
+        return message;
+    }
+
+    // only for demo
     public static void coolSending(long msgSentCount) {
         if (msgSentCount % 5000 == 0) {
             ThreadUtils.sleep(3000);
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
index 4a7ab2f..574caaa 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
@@ -17,20 +17,17 @@
 
 package org.apache.inlong.tubemq.example;
 
-import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -38,9 +35,7 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,194 +48,180 @@ import org.slf4j.LoggerFactory;
 public class MAMessageProducerExample {
     private static final Logger logger =
             LoggerFactory.getLogger(MAMessageProducerExample.class);
-    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
-    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
-
-    private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
-    private static final int MAX_PRODUCER_NUM = 100;
-    private static final int SESSION_FACTORY_NUM = 10;
-
-    private static Map<String, TreeSet<String>> topicAndFiltersMap;
-    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
-    private static int msgCount;
-    private static int clientCount;
-    private static byte[] sendData;
-    private static AtomicLong filterMsgCount = new AtomicLong(0);
-
-    private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
-    private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
-    private final ExecutorService sendExecutorService =
-            Executors.newFixedThreadPool(MAX_PRODUCER_NUM, new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable runnable) {
-                    return new Thread(runnable, "sender_" + producerMap.size());
-                }
-            });
-    private final AtomicInteger producerIndex = new AtomicInteger(0);
-
-    public MAMessageProducerExample(String masterHostAndPort) throws Exception {
-        TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
-        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
-            this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
-        }
-    }
-
-    public static void main(String[] args) {
-        // get call parameters
+    private static final MsgSendReceiveStats msgSendStats =
+            new MsgSendReceiveStats(true);
+    private static final Map<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>>
+            sessionFactoryProducerMap = new HashMap<>();
+    private static final AtomicLong totalSentCnt = new AtomicLong(0);
+    private static ExecutorService sendExecutorService;
+
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to publish to.
+    // The 3rd parameter msgCount is the message amount that needs to be sent
+    // The 4th parameter pkgSize is the message's body size that needs to be sent
+    // The 5th parameter clientCount is the amount of producer
+    // The 6th parameter sessionFactoryCnt is the amount of session factory
+    public static void main(String[] args) throws Throwable {
+        // 1. get call parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        msgCount = Integer.parseInt(args[2]);
-        clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-        topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
-        // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
+        final String pubTopicAndFilterItems = args[1];
+        final long msgCount = Long.parseLong(args[2]);
+        int pkgSize = 1024;
+        if (args.length > 3) {
+            pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
+        }
+        int clientCnt = 2;
+        if (args.length > 4) {
+            clientCnt = MixedUtils.mid(Integer.parseInt(args[4]), 1, 100);
         }
-        // build message's body content
-        final byte[] transmitData =
-                StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
-        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-        while (dataBuffer.hasRemaining()) {
-            int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(transmitData, offset,
-                    Math.min(dataBuffer.remaining(), transmitData.length));
+        int sessionFactoryCnt = 10;
+        if (args.length > 5) {
+            sessionFactoryCnt = MixedUtils.mid(Integer.parseInt(args[5]), 1, 20);
+        }
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+        // 2. build multi-session factory
+        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+        for (int i = 0; i < sessionFactoryCnt; i++) {
+            sessionFactoryProducerMap.put(i,
+                    new Tuple2<>(new TubeMultiSessionFactory(clientConfig), new HashSet<>()));
+        }
+
+        // 3. build multi-thread message sender
+        sendExecutorService =
+                Executors.newFixedThreadPool(clientCnt, new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable runnable) {
+                        return new Thread(runnable);
+                    }
+                });
+
+        // 4. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgSendStats, "Sent Statistic Thread");
+        statisticThread.start();
+
+        // 5. build the content of the message to be sent
+        //    include message body, attributes, and time information template
+        final byte[] bodyData =
+                MixedUtils.buildTestData(pkgSize);
+        List<Tuple2<String, String>> buildTopicFilterTuples =
+                MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+
+        // 6. Rotating build and start producers in the session factory list
+        //    In the same process, different TubeMultiSessionFactory objects can create
+        //    independent connections for the same Broker.
+        //   We increase the concurrent throughput of the system by increasing the
+        //   number of links. Here we distribute the clients evenly on
+        //   different TubeMultiSessionFactory objects to
+        //   improve data production performance in the single process
+        for (int indexId = 0; indexId < clientCnt; indexId++) {
+            Tuple2<MessageSessionFactory, Set<MessageProducer>> sessionProducerMap =
+                    sessionFactoryProducerMap.get(indexId % sessionFactoryCnt);
+            MessageProducer producer = sessionProducerMap.getF0().createProducer();
+            producer.publish(topicAndFiltersMap.keySet());
+            sessionProducerMap.getF1().add(producer);
+            sendExecutorService.submit(new Sender(indexId,
+                    producer, bodyData, buildTopicFilterTuples, msgCount));
         }
-        dataBuffer.flip();
-        sendData = dataBuffer.array();
-        // print started log
-        logger.info("MAMessageProducerExample.main started...");
 
+        // 7. wait all tasks finished
         try {
-            // initial producer objects
-            MAMessageProducerExample messageProducer =
-                    new MAMessageProducerExample(masterServers);
-            messageProducer.startService();
             // wait util sent message's count reachs required count
-            while (TOTAL_COUNTER.get() < msgCount * clientCount) {
-                logger.info("Sending, total messages is {}, filter messages is {}",
-                        SENT_SUCC_COUNTER.get(), filterMsgCount.get());
-                Thread.sleep(5000);
+            long needSentCnt = msgCount * clientCnt;
+            while (totalSentCnt.get() < needSentCnt) {
+                logger.info("Sending task is running, total = {}, finished = {}",
+                        needSentCnt, totalSentCnt.get());
+                Thread.sleep(30000);
             }
-            logger.info("Finished, total messages is {}, filter messages is {}",
-                    SENT_SUCC_COUNTER.get(), filterMsgCount.get());
-            messageProducer.producerMap.clear();
-            messageProducer.shutdown();
-        } catch (TubeClientException e) {
-            logger.error("TubeClientException: ", e);
         } catch (Throwable e) {
             logger.error("Throwable: ", e);
         }
-    }
-
-    public MessageProducer createProducer() throws TubeClientException {
-        int index = (producerIndex.incrementAndGet()) % SESSION_FACTORY_NUM;
-        return sessionFactoryList.get(index).createProducer();
-    }
-
-    private void startService() throws TubeClientException {
-        for (int i = 0; i < clientCount; i++) {
-            PRODUCER_LIST.add(createProducer());
-        }
-
-        for (MessageProducer producer : PRODUCER_LIST) {
-            if (producer != null) {
-                producerMap.put(producer, new Sender(producer));
-                sendExecutorService.submit(producerMap.get(producer));
-            }
-        }
-    }
-
-    public void shutdown() throws Throwable {
+        // clear resources
         sendExecutorService.shutdownNow();
-        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
-            sessionFactoryList.get(i).shutdown();
+        for (int i = 0; i < sessionFactoryCnt; i++) {
+            sessionFactoryProducerMap.get(i).getF0().shutdown();
         }
-
+        msgSendStats.stopStats();
+        logger.info("Sending task is finished, total sent {} messages", totalSentCnt.get());
     }
 
-    public class Sender implements Runnable {
-        private MessageProducer producer;
+    public static class Sender implements Runnable {
 
-        public Sender(MessageProducer producer) {
+        private final int indexId;
+        private final MessageProducer producer;
+        private final byte[] bodyData;
+        private final long msgCount;
+        private final List<Tuple2<String, String>> topicFilterTuples;
+
+        public Sender(int indexId, MessageProducer producer, byte[] bodyData,
+                      List<Tuple2<String, String>> topicFilterTuples, long msgCount) {
+            this.indexId = indexId;
             this.producer = producer;
+            this.bodyData = bodyData;
+            this.msgCount = msgCount;
+            this.topicFilterTuples = topicFilterTuples;
         }
 
         @Override
         public void run() {
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
-            try {
-                producer.publish(topicAndFiltersMap.keySet());
-            } catch (Throwable t) {
-                logger.error("publish exception: ", t);
-            }
+            // send message to server
             long sentCount = 0;
             int roundIndex = 0;
-            int targetCnt = topicSendRounds.size();
+            int targetCnt = topicFilterTuples.size();
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
             while (msgCount < 0 || sentCount < msgCount) {
-                long millis = System.currentTimeMillis();
+                // 1 Rotate to get the attribute information to be sent
                 roundIndex = (int) (sentCount++ % targetCnt);
-                Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                Message message = new Message(target.getF0(), sendData);
-                message.setAttrKeyVal("index", String.valueOf(sentCount));
-                message.setAttrKeyVal("dataTime", String.valueOf(millis));
-                if (target.getF1() != null) {
-                    filterMsgCount.incrementAndGet();
-                    message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
-                }
+                Tuple2<String, String> target = topicFilterTuples.get(roundIndex);
+
+                // 2 send message
                 try {
-                    // next line sends message synchronously, which is not recommended
-                    //producer.sendMessage(message);
-                    // send message asynchronously, recommended
-                    producer.sendMessage(message, new DefaultSendCallback());
-                } catch (Throwable e1) {
-                    TOTAL_COUNTER.incrementAndGet();
-                    SENT_EXCEPT_COUNTER.incrementAndGet();
-                    logger.error("sendMessage exception: ", e1);
-                }
-                TOTAL_COUNTER.incrementAndGet();
-                // only for test, delay inflight message's count
-                if (sentCount % 5000 == 0) {
-                    ThreadUtils.sleep(3000);
-                } else if (sentCount % 4000 == 0) {
-                    ThreadUtils.sleep(2000);
-                } else if (sentCount % 2000 == 0) {
-                    ThreadUtils.sleep(800);
-                } else if (sentCount % 1000 == 0) {
-                    ThreadUtils.sleep(400);
+                    // 2.1 Send data asynchronously, recommended
+                    producer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+                            target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+                    // Or
+                    // 2.2 Send message synchronous, not recommended
+                    // MessageSentResult result = producer.sendMessage(message);
+                    // totalSentCnt.incrementAndGet();
+                    // if (!result.isSuccess()) {
+                    //    logger.error("Sync-send message failed!" + result.getErrMsg());
+                    // }
+                } catch (TubeClientException | InterruptedException e) {
+                    logger.error("Send message failed!", e);
                 }
+                // 3 Cool sending
+                //     Attention: only used in the test link, to solve the problem of
+                //                frequent sending failures caused by insufficient test resources.
+                MixedUtils.coolSending(sentCount);
             }
             try {
                 producer.shutdown();
             } catch (Throwable e) {
                 logger.error("producer shutdown error: ", e);
             }
-
+            logger.info("The message sending task(" + indexId + ") has been completed!");
         }
     }
 
-    private class DefaultSendCallback implements MessageSentCallback {
+    private static class DefaultSendCallback implements MessageSentCallback {
+
         @Override
         public void onMessageSent(MessageSentResult result) {
-            TOTAL_COUNTER.incrementAndGet();
+            totalSentCnt.incrementAndGet();
             if (result.isSuccess()) {
-                SENT_SUCC_COUNTER.incrementAndGet();
+                msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
             } else {
-                SENT_FAIL_COUNTER.incrementAndGet();
+                logger.error("Send message failed!" + result.getErrMsg());
             }
         }
 
         @Override
         public void onException(Throwable e) {
-            TOTAL_COUNTER.incrementAndGet();
-            SENT_EXCEPT_COUNTER.incrementAndGet();
+            totalSentCnt.incrementAndGet();
             logger.error("Send message error!", e);
         }
     }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index bb4de3c..afffde7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.tubemq.example;
 
 import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,7 +29,6 @@ import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.inlong.tubemq.client.producer.MessageProducer;
 import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
 import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.slf4j.Logger;
@@ -84,7 +82,7 @@ public final class MessageProducerExample {
 
         // 4. build the content of the message to be sent
         //    include message body, attributes, and time information template
-        final byte[] dataBuffer =
+        final byte[] bodyData =
                 MixedUtils.buildTestData(pkgSize);
         List<Tuple2<String, String>> buildTopicFilterTuples =
                 MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
@@ -99,31 +97,22 @@ public final class MessageProducerExample {
             roundIndex = (int) (sentCount++ % targetCnt);
             Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
 
-            // 5.2 build message to be sent
-            Message message = new Message(target.getF0(), dataBuffer);
-            long currTimeMillis = System.currentTimeMillis();
-            message.setAttrKeyVal("index", String.valueOf(sentCount));
-            message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
-            if (target.getF1() != null) {
-                // 5.2.1 add filter attribute information
-                message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
-            }
-
-            // 5.3 send message
+            // 5.2 send message
             try {
-                // 5.3.1 Send data asynchronously, recommended
-                messageProducer.sendMessage(message, new DefaultSendCallback());
-
-                // 5.3.2 Send message synchronous, not recommended
+                // 5.2.1 Send data asynchronously, recommended
+                messageProducer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+                        target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+                // Or
+                // 5.2.2 Send message synchronous, not recommended
                 // MessageSentResult result = messageProducer.sendMessage(message);
                 // if (!result.isSuccess()) {
                 //    logger.error("Sync-send message failed!" + result.getErrMsg());
                 // }
             } catch (TubeClientException | InterruptedException e) {
-                logger.error("Async-send message failed!", e);
+                logger.error("Send message failed!", e);
             }
 
-            // 5.4 Cool sending
+            // 5.3 Cool sending
             //     Attention: only used in the test link, to solve the problem of
             //                frequent sending failures caused by insufficient test resources.
             MixedUtils.coolSending(sentCount);