You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 07:37:18 UTC
[incubator-inlong] branch INLONG-1739 updated: [INLONG-1761]
Optimize the realization of class MAMessageProducerExample (#1763)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new 76e0974 [INLONG-1761] Optimize the realization of class MAMessageProducerExample (#1763)
76e0974 is described below
commit 76e097482193ab82dd282883eb94499b36d61586
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 15:37:14 2021 +0800
[INLONG-1761] Optimize the realization of class MAMessageProducerExample (#1763)
---
.../inlong/tubemq/corebase/utils/MixedUtils.java | 23 ++
.../tubemq/example/MAMessageProducerExample.java | 283 ++++++++++-----------
.../tubemq/example/MessageProducerExample.java | 29 +--
3 files changed, 164 insertions(+), 171 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index 18c4ca1..e4629b2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,13 +18,16 @@
package org.apache.inlong.tubemq.corebase.utils;
import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TokenConstants;
public class MixedUtils {
@@ -98,6 +101,7 @@ public class MixedUtils {
return topicFilterTuples;
}
+ // only for demo
public static byte[] buildTestData(int bodySize) {
final byte[] transmitData =
StringUtils.getBytesUtf8("This is a test data!");
@@ -111,6 +115,25 @@ public class MixedUtils {
return dataBuffer.array();
}
+ // build message to be sent
+ // only for demo
+ public static Message buildMessage(String topicName, String filterItem,
+ byte[] bodyData, long serialId,
+ SimpleDateFormat sdf) {
+ // build message to be sent
+ Message message = new Message(topicName, bodyData);
+ long currTimeMillis = System.currentTimeMillis();
+ // added a serial number and data generation time to each message
+ message.setAttrKeyVal("serialId", String.valueOf(serialId));
+ message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+ if (filterItem != null) {
+ // add filter attribute information
+ message.putSystemHeader(filterItem, sdf.format(new Date(currTimeMillis)));
+ }
+ return message;
+ }
+
+ // only for demo
public static void coolSending(long msgSentCount) {
if (msgSentCount % 5000 == 0) {
ThreadUtils.sleep(3000);
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
index 4a7ab2f..574caaa 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
@@ -17,20 +17,17 @@
package org.apache.inlong.tubemq.example;
-import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -38,9 +35,7 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,194 +48,180 @@ import org.slf4j.LoggerFactory;
public class MAMessageProducerExample {
private static final Logger logger =
LoggerFactory.getLogger(MAMessageProducerExample.class);
- private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
-
- private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
- private static final int MAX_PRODUCER_NUM = 100;
- private static final int SESSION_FACTORY_NUM = 10;
-
- private static Map<String, TreeSet<String>> topicAndFiltersMap;
- private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
- private static int msgCount;
- private static int clientCount;
- private static byte[] sendData;
- private static AtomicLong filterMsgCount = new AtomicLong(0);
-
- private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
- private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
- private final ExecutorService sendExecutorService =
- Executors.newFixedThreadPool(MAX_PRODUCER_NUM, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable runnable) {
- return new Thread(runnable, "sender_" + producerMap.size());
- }
- });
- private final AtomicInteger producerIndex = new AtomicInteger(0);
-
- public MAMessageProducerExample(String masterHostAndPort) throws Exception {
- TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
- for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
- this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
- }
- }
-
- public static void main(String[] args) {
- // get call parameters
+ private static final MsgSendReceiveStats msgSendStats =
+ new MsgSendReceiveStats(true);
+ private static final Map<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>>
+ sessionFactoryProducerMap = new HashMap<>();
+ private static final AtomicLong totalSentCnt = new AtomicLong(0);
+ private static ExecutorService sendExecutorService;
+
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to publish to.
+ // The 3rd parameter msgCount is the message amount that needs to be sent
+ // The 4th parameter pkgSize is the message's body size that needs to be sent
+ // The 5th parameter clientCount is the amount of producer
+ // The 6th parameter sessionFactoryCnt is the amount of session factory
+ public static void main(String[] args) throws Throwable {
+ // 1. get call parameters
final String masterServers = args[0];
- final String topics = args[1];
- msgCount = Integer.parseInt(args[2]);
- clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
- topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
- // initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
+ final String pubTopicAndFilterItems = args[1];
+ final long msgCount = Long.parseLong(args[2]);
+ int pkgSize = 1024;
+ if (args.length > 3) {
+ pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
+ }
+ int clientCnt = 2;
+ if (args.length > 4) {
+ clientCnt = MixedUtils.mid(Integer.parseInt(args[4]), 1, 100);
}
- // build message's body content
- final byte[] transmitData =
- StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset,
- Math.min(dataBuffer.remaining(), transmitData.length));
+ int sessionFactoryCnt = 10;
+ if (args.length > 5) {
+ sessionFactoryCnt = MixedUtils.mid(Integer.parseInt(args[5]), 1, 20);
+ }
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+ // 2. build multi-session factory
+ TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+ for (int i = 0; i < sessionFactoryCnt; i++) {
+ sessionFactoryProducerMap.put(i,
+ new Tuple2<>(new TubeMultiSessionFactory(clientConfig), new HashSet<>()));
+ }
+
+ // 3. build multi-thread message sender
+ sendExecutorService =
+ Executors.newFixedThreadPool(clientCnt, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable);
+ }
+ });
+
+ // 4. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgSendStats, "Sent Statistic Thread");
+ statisticThread.start();
+
+ // 5. build the content of the message to be sent
+ // include message body, attributes, and time information template
+ final byte[] bodyData =
+ MixedUtils.buildTestData(pkgSize);
+ List<Tuple2<String, String>> buildTopicFilterTuples =
+ MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+
+ // 6. Rotating build and start producers in the session factory list
+ // In the same process, different TubeMultiSessionFactory objects can create
+ // independent connections for the same Broker.
+ // We increase the concurrent throughput of the system by increasing the
+ // number of links. Here we distribute the clients evenly on
+ // different TubeMultiSessionFactory objects to
+ // improve data production performance in the single process
+ for (int indexId = 0; indexId < clientCnt; indexId++) {
+ Tuple2<MessageSessionFactory, Set<MessageProducer>> sessionProducerMap =
+ sessionFactoryProducerMap.get(indexId % sessionFactoryCnt);
+ MessageProducer producer = sessionProducerMap.getF0().createProducer();
+ producer.publish(topicAndFiltersMap.keySet());
+ sessionProducerMap.getF1().add(producer);
+ sendExecutorService.submit(new Sender(indexId,
+ producer, bodyData, buildTopicFilterTuples, msgCount));
}
- dataBuffer.flip();
- sendData = dataBuffer.array();
- // print started log
- logger.info("MAMessageProducerExample.main started...");
+ // 7. wait all tasks finished
try {
- // initial producer objects
- MAMessageProducerExample messageProducer =
- new MAMessageProducerExample(masterServers);
- messageProducer.startService();
// wait util sent message's count reachs required count
- while (TOTAL_COUNTER.get() < msgCount * clientCount) {
- logger.info("Sending, total messages is {}, filter messages is {}",
- SENT_SUCC_COUNTER.get(), filterMsgCount.get());
- Thread.sleep(5000);
+ long needSentCnt = msgCount * clientCnt;
+ while (totalSentCnt.get() < needSentCnt) {
+ logger.info("Sending task is running, total = {}, finished = {}",
+ needSentCnt, totalSentCnt.get());
+ Thread.sleep(30000);
}
- logger.info("Finished, total messages is {}, filter messages is {}",
- SENT_SUCC_COUNTER.get(), filterMsgCount.get());
- messageProducer.producerMap.clear();
- messageProducer.shutdown();
- } catch (TubeClientException e) {
- logger.error("TubeClientException: ", e);
} catch (Throwable e) {
logger.error("Throwable: ", e);
}
- }
-
- public MessageProducer createProducer() throws TubeClientException {
- int index = (producerIndex.incrementAndGet()) % SESSION_FACTORY_NUM;
- return sessionFactoryList.get(index).createProducer();
- }
-
- private void startService() throws TubeClientException {
- for (int i = 0; i < clientCount; i++) {
- PRODUCER_LIST.add(createProducer());
- }
-
- for (MessageProducer producer : PRODUCER_LIST) {
- if (producer != null) {
- producerMap.put(producer, new Sender(producer));
- sendExecutorService.submit(producerMap.get(producer));
- }
- }
- }
-
- public void shutdown() throws Throwable {
+ // clear resources
sendExecutorService.shutdownNow();
- for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
- sessionFactoryList.get(i).shutdown();
+ for (int i = 0; i < sessionFactoryCnt; i++) {
+ sessionFactoryProducerMap.get(i).getF0().shutdown();
}
-
+ msgSendStats.stopStats();
+ logger.info("Sending task is finished, total sent {} messages", totalSentCnt.get());
}
- public class Sender implements Runnable {
- private MessageProducer producer;
+ public static class Sender implements Runnable {
- public Sender(MessageProducer producer) {
+ private final int indexId;
+ private final MessageProducer producer;
+ private final byte[] bodyData;
+ private final long msgCount;
+ private final List<Tuple2<String, String>> topicFilterTuples;
+
+ public Sender(int indexId, MessageProducer producer, byte[] bodyData,
+ List<Tuple2<String, String>> topicFilterTuples, long msgCount) {
+ this.indexId = indexId;
this.producer = producer;
+ this.bodyData = bodyData;
+ this.msgCount = msgCount;
+ this.topicFilterTuples = topicFilterTuples;
}
@Override
public void run() {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- try {
- producer.publish(topicAndFiltersMap.keySet());
- } catch (Throwable t) {
- logger.error("publish exception: ", t);
- }
+ // send message to server
long sentCount = 0;
int roundIndex = 0;
- int targetCnt = topicSendRounds.size();
+ int targetCnt = topicFilterTuples.size();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
while (msgCount < 0 || sentCount < msgCount) {
- long millis = System.currentTimeMillis();
+ // 1 Rotate to get the attribute information to be sent
roundIndex = (int) (sentCount++ % targetCnt);
- Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), sendData);
- message.setAttrKeyVal("index", String.valueOf(sentCount));
- message.setAttrKeyVal("dataTime", String.valueOf(millis));
- if (target.getF1() != null) {
- filterMsgCount.incrementAndGet();
- message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
- }
+ Tuple2<String, String> target = topicFilterTuples.get(roundIndex);
+
+ // 2 send message
try {
- // next line sends message synchronously, which is not recommended
- //producer.sendMessage(message);
- // send message asynchronously, recommended
- producer.sendMessage(message, new DefaultSendCallback());
- } catch (Throwable e1) {
- TOTAL_COUNTER.incrementAndGet();
- SENT_EXCEPT_COUNTER.incrementAndGet();
- logger.error("sendMessage exception: ", e1);
- }
- TOTAL_COUNTER.incrementAndGet();
- // only for test, delay inflight message's count
- if (sentCount % 5000 == 0) {
- ThreadUtils.sleep(3000);
- } else if (sentCount % 4000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2000 == 0) {
- ThreadUtils.sleep(800);
- } else if (sentCount % 1000 == 0) {
- ThreadUtils.sleep(400);
+ // 2.1 Send data asynchronously, recommended
+ producer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+ target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+ // Or
+ // 2.2 Send message synchronous, not recommended
+ // MessageSentResult result = producer.sendMessage(message);
+ // totalSentCnt.incrementAndGet();
+ // if (!result.isSuccess()) {
+ // logger.error("Sync-send message failed!" + result.getErrMsg());
+ // }
+ } catch (TubeClientException | InterruptedException e) {
+ logger.error("Send message failed!", e);
}
+ // 3 Cool sending
+ // Attention: only used in the test link, to solve the problem of
+ // frequent sending failures caused by insufficient test resources.
+ MixedUtils.coolSending(sentCount);
}
try {
producer.shutdown();
} catch (Throwable e) {
logger.error("producer shutdown error: ", e);
}
-
+ logger.info("The message sending task(" + indexId + ") has been completed!");
}
}
- private class DefaultSendCallback implements MessageSentCallback {
+ private static class DefaultSendCallback implements MessageSentCallback {
+
@Override
public void onMessageSent(MessageSentResult result) {
- TOTAL_COUNTER.incrementAndGet();
+ totalSentCnt.incrementAndGet();
if (result.isSuccess()) {
- SENT_SUCC_COUNTER.incrementAndGet();
+ msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
} else {
- SENT_FAIL_COUNTER.incrementAndGet();
+ logger.error("Send message failed!" + result.getErrMsg());
}
}
@Override
public void onException(Throwable e) {
- TOTAL_COUNTER.incrementAndGet();
- SENT_EXCEPT_COUNTER.incrementAndGet();
+ totalSentCnt.incrementAndGet();
logger.error("Send message error!", e);
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index bb4de3c..afffde7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -18,7 +18,6 @@
package org.apache.inlong.tubemq.example;
import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,7 +29,6 @@ import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
@@ -84,7 +82,7 @@ public final class MessageProducerExample {
// 4. build the content of the message to be sent
// include message body, attributes, and time information template
- final byte[] dataBuffer =
+ final byte[] bodyData =
MixedUtils.buildTestData(pkgSize);
List<Tuple2<String, String>> buildTopicFilterTuples =
MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
@@ -99,31 +97,22 @@ public final class MessageProducerExample {
roundIndex = (int) (sentCount++ % targetCnt);
Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
- // 5.2 build message to be sent
- Message message = new Message(target.getF0(), dataBuffer);
- long currTimeMillis = System.currentTimeMillis();
- message.setAttrKeyVal("index", String.valueOf(sentCount));
- message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
- if (target.getF1() != null) {
- // 5.2.1 add filter attribute information
- message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
- }
-
- // 5.3 send message
+ // 5.2 send message
try {
- // 5.3.1 Send data asynchronously, recommended
- messageProducer.sendMessage(message, new DefaultSendCallback());
-
- // 5.3.2 Send message synchronous, not recommended
+ // 5.2.1 Send data asynchronously, recommended
+ messageProducer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+ target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+ // Or
+ // 5.2.2 Send message synchronous, not recommended
// MessageSentResult result = messageProducer.sendMessage(message);
// if (!result.isSuccess()) {
// logger.error("Sync-send message failed!" + result.getErrMsg());
// }
} catch (TubeClientException | InterruptedException e) {
- logger.error("Async-send message failed!", e);
+ logger.error("Send message failed!", e);
}
- // 5.4 Cool sending
+ // 5.3 Cool sending
// Attention: only used in the test link, to solve the problem of
// frequent sending failures caused by insufficient test resources.
MixedUtils.coolSending(sentCount);