You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 10:29:43 UTC
[incubator-inlong] branch master updated: [INLONG-1739][Improve]
Optimization of TubeMQ SDK usage demo (#1767)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 209bc63 [INLONG-1739][Improve] Optimization of TubeMQ SDK usage demo (#1767)
209bc63 is described below
commit 209bc63bab8399a986f259486be5cb36c888e77d
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 18:29:35 2021 +0800
[INLONG-1739][Improve] Optimization of TubeMQ SDK usage demo (#1767)
---
.../tubemq/client/config/TubeClientConfig.java | 2 +-
.../tubemq/client/consumer/MessageConsumer.java | 32 ++
.../inlong/tubemq/corebase/cluster/MasterInfo.java | 8 +-
.../inlong/tubemq/corebase/utils/MixedUtils.java | 55 ++++
.../tubemq/example/MAMessageProducerExample.java | 283 ++++++++---------
.../tubemq/example/MessageConsumerExample.java | 148 ---------
.../tubemq/example/MessageProducerExample.java | 203 +++++--------
.../tubemq/example/MessagePullConsumerExample.java | 221 ++++++++------
.../example/MessagePullSetConsumerExample.java | 335 +++++++++++++--------
.../tubemq/example/MessagePushConsumerExample.java | 153 ++++++++++
...{MsgRecvStats.java => MsgSendReceiveStats.java} | 35 ++-
.../tubemq/server/tools/cli/CliProducer.java | 35 +--
12 files changed, 814 insertions(+), 696 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index 83285b2..c567c37 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.inlong.tubemq.corerpc.RpcConstants;
*/
public class TubeClientConfig {
// Master information.
- private MasterInfo masterInfo;
+ private final MasterInfo masterInfo;
// Rpc read time out.
private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
// Rpc connection processor number.
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
index 2f3a1ce..df98490 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
@@ -35,6 +35,11 @@ public interface MessageConsumer extends Shutdownable {
boolean isFilterConsume(String topic);
+ /**
+ * Get consume offset information of the current registered partitions
+ *
+ * @return consume offset information
+ */
Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException;
/**
@@ -73,8 +78,35 @@ public interface MessageConsumer extends Shutdownable {
*/
Map<String, Long> getFrozenPartInfo();
+ /**
+ * Start consume messages with default setting
+ */
void completeSubscribe() throws TubeClientException;
+ /**
+ * Start consumption with the precise Offset settings
+ *
+ * The parameter sessionKey is specified by the caller, similar to the JobID in Flink,
+ * which is used to identify the unrelated offset reset consumption activities before and after.
+ * Each reset operation needs to ensure that it is different from the last reset carried sessionKey;
+ *
+ * The parameter sourceCount is used to inform the server how many consumers will consume
+ * in this round of consumer group activation, and the client will not consume data until
+ * the consumer group has not reached the specified number of consumers.
+ *
+ * The parameter isSelectBig is used to inform the server that if multiple clients reset
+ * the offset to the same partition, the server will use the largest offset
+ * or the smallest offset as the standard;
+ *
+ * The parameter partOffsetMap is used to inform the server that this consumption expects
+ * the partitions in the Map to be consumed according to the specified offset value.
+ * The offset in the Map comes from the consumer's query from the server, or the content
+ * returned when the consumer successfully consumes the data before, including push
+ * the PearInfo object returned by the callback function during consumption and
+ * the PearInfo object in the ConsumerResult class during Pull consumption.
+ * The Key in the Map is the partitionKey carried in PearInfo, and the value is
+ * the currOffset value carried in PearInfo.
+ */
void completeSubscribe(String sessionKey,
int sourceCount,
boolean isSelectBig,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
index 2de42ba..680bb05 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
@@ -29,9 +29,9 @@ public class MasterInfo {
private final Map<String/* ip:port */, NodeAddrInfo> addrMap4Failover =
new HashMap<>();
- private List<String> nodeHostPortList;
+ private final List<String> nodeHostPortList;
private NodeAddrInfo firstNodeAddr = null;
- private String masterClusterStr;
+ private final String masterClusterStr;
/**
* masterAddrInfo: "ip1:port,ip2:port"
@@ -70,9 +70,7 @@ public class MasterInfo {
}
int port = Integer.parseInt(hostPortItem[1].trim());
NodeAddrInfo tmpNodeAddrInfo = new NodeAddrInfo(hostName, port);
- if (addrMap4Failover.get(tmpNodeAddrInfo.getHostPortStr()) == null) {
- addrMap4Failover.put(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
- }
+ addrMap4Failover.putIfAbsent(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
if (this.firstNodeAddr == null) {
this.firstNodeAddr = tmpNodeAddrInfo;
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index eb432d5..e4629b2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,11 +18,16 @@
package org.apache.inlong.tubemq.corebase.utils;
import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TokenConstants;
public class MixedUtils {
@@ -78,6 +83,25 @@ public class MixedUtils {
return topicAndFiltersMap;
}
+ // build the topic and filter item pair carried in the message
+ public static List<Tuple2<String, String>> buildTopicFilterTupleList(
+ Map<String, TreeSet<String>> topicAndFiltersMap) {
+ // initial send target
+ List<Tuple2<String, String>> topicFilterTuples = new ArrayList<>();
+ // initial topic send round
+ for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ topicFilterTuples.add(new Tuple2<>(entry.getKey()));
+ } else {
+ for (String filter : entry.getValue()) {
+ topicFilterTuples.add(new Tuple2<>(entry.getKey(), filter));
+ }
+ }
+ }
+ return topicFilterTuples;
+ }
+
+ // only for demo
public static byte[] buildTestData(int bodySize) {
final byte[] transmitData =
StringUtils.getBytesUtf8("This is a test data!");
@@ -91,6 +115,37 @@ public class MixedUtils {
return dataBuffer.array();
}
+ // build message to be sent
+ // only for demo
+ public static Message buildMessage(String topicName, String filterItem,
+ byte[] bodyData, long serialId,
+ SimpleDateFormat sdf) {
+ // build message to be sent
+ Message message = new Message(topicName, bodyData);
+ long currTimeMillis = System.currentTimeMillis();
+ // added a serial number and data generation time to each message
+ message.setAttrKeyVal("serialId", String.valueOf(serialId));
+ message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+ if (filterItem != null) {
+ // add filter attribute information
+ message.putSystemHeader(filterItem, sdf.format(new Date(currTimeMillis)));
+ }
+ return message;
+ }
+
+ // only for demo
+ public static void coolSending(long msgSentCount) {
+ if (msgSentCount % 5000 == 0) {
+ ThreadUtils.sleep(3000);
+ } else if (msgSentCount % 4000 == 0) {
+ ThreadUtils.sleep(2000);
+ } else if (msgSentCount % 2000 == 0) {
+ ThreadUtils.sleep(800);
+ } else if (msgSentCount % 1000 == 0) {
+ ThreadUtils.sleep(400);
+ }
+ }
+
// get the middle data between min, max, and data
public static int mid(int data, int min, int max) {
return Math.max(min, Math.min(max, data));
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
index 4a7ab2f..574caaa 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MAMessageProducerExample.java
@@ -17,20 +17,17 @@
package org.apache.inlong.tubemq.example;
-import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -38,9 +35,7 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,194 +48,180 @@ import org.slf4j.LoggerFactory;
public class MAMessageProducerExample {
private static final Logger logger =
LoggerFactory.getLogger(MAMessageProducerExample.class);
- private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
- private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
-
- private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
- private static final int MAX_PRODUCER_NUM = 100;
- private static final int SESSION_FACTORY_NUM = 10;
-
- private static Map<String, TreeSet<String>> topicAndFiltersMap;
- private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
- private static int msgCount;
- private static int clientCount;
- private static byte[] sendData;
- private static AtomicLong filterMsgCount = new AtomicLong(0);
-
- private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
- private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
- private final ExecutorService sendExecutorService =
- Executors.newFixedThreadPool(MAX_PRODUCER_NUM, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable runnable) {
- return new Thread(runnable, "sender_" + producerMap.size());
- }
- });
- private final AtomicInteger producerIndex = new AtomicInteger(0);
-
- public MAMessageProducerExample(String masterHostAndPort) throws Exception {
- TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
- for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
- this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
- }
- }
-
- public static void main(String[] args) {
- // get call parameters
+ private static final MsgSendReceiveStats msgSendStats =
+ new MsgSendReceiveStats(true);
+ private static final Map<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>>
+ sessionFactoryProducerMap = new HashMap<>();
+ private static final AtomicLong totalSentCnt = new AtomicLong(0);
+ private static ExecutorService sendExecutorService;
+
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to publish to.
+ // The 3rd parameter msgCount is the message amount that needs to be sent
+ // The 4th parameter pkgSize is the message's body size that needs to be sent
+ // The 5th parameter clientCount is the amount of producer
+ // The 6th parameter sessionFactoryCnt is the amount of session factory
+ public static void main(String[] args) throws Throwable {
+ // 1. get call parameters
final String masterServers = args[0];
- final String topics = args[1];
- msgCount = Integer.parseInt(args[2]);
- clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
- topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
- // initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
+ final String pubTopicAndFilterItems = args[1];
+ final long msgCount = Long.parseLong(args[2]);
+ int pkgSize = 1024;
+ if (args.length > 3) {
+ pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
+ }
+ int clientCnt = 2;
+ if (args.length > 4) {
+ clientCnt = MixedUtils.mid(Integer.parseInt(args[4]), 1, 100);
}
- // build message's body content
- final byte[] transmitData =
- StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset,
- Math.min(dataBuffer.remaining(), transmitData.length));
+ int sessionFactoryCnt = 10;
+ if (args.length > 5) {
+ sessionFactoryCnt = MixedUtils.mid(Integer.parseInt(args[5]), 1, 20);
+ }
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+ // 2. build multi-session factory
+ TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+ for (int i = 0; i < sessionFactoryCnt; i++) {
+ sessionFactoryProducerMap.put(i,
+ new Tuple2<>(new TubeMultiSessionFactory(clientConfig), new HashSet<>()));
+ }
+
+ // 3. build multi-thread message sender
+ sendExecutorService =
+ Executors.newFixedThreadPool(clientCnt, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable);
+ }
+ });
+
+ // 4. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgSendStats, "Sent Statistic Thread");
+ statisticThread.start();
+
+ // 5. build the content of the message to be sent
+ // include message body, attributes, and time information template
+ final byte[] bodyData =
+ MixedUtils.buildTestData(pkgSize);
+ List<Tuple2<String, String>> buildTopicFilterTuples =
+ MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+
+ // 6. Rotating build and start producers in the session factory list
+ // In the same process, different TubeMultiSessionFactory objects can create
+ // independent connections for the same Broker.
+ // We increase the concurrent throughput of the system by increasing the
+ // number of links. Here we distribute the clients evenly on
+ // different TubeMultiSessionFactory objects to
+ // improve data production performance in the single process
+ for (int indexId = 0; indexId < clientCnt; indexId++) {
+ Tuple2<MessageSessionFactory, Set<MessageProducer>> sessionProducerMap =
+ sessionFactoryProducerMap.get(indexId % sessionFactoryCnt);
+ MessageProducer producer = sessionProducerMap.getF0().createProducer();
+ producer.publish(topicAndFiltersMap.keySet());
+ sessionProducerMap.getF1().add(producer);
+ sendExecutorService.submit(new Sender(indexId,
+ producer, bodyData, buildTopicFilterTuples, msgCount));
}
- dataBuffer.flip();
- sendData = dataBuffer.array();
- // print started log
- logger.info("MAMessageProducerExample.main started...");
+ // 7. wait all tasks finished
try {
- // initial producer objects
- MAMessageProducerExample messageProducer =
- new MAMessageProducerExample(masterServers);
- messageProducer.startService();
// wait util sent message's count reachs required count
- while (TOTAL_COUNTER.get() < msgCount * clientCount) {
- logger.info("Sending, total messages is {}, filter messages is {}",
- SENT_SUCC_COUNTER.get(), filterMsgCount.get());
- Thread.sleep(5000);
+ long needSentCnt = msgCount * clientCnt;
+ while (totalSentCnt.get() < needSentCnt) {
+ logger.info("Sending task is running, total = {}, finished = {}",
+ needSentCnt, totalSentCnt.get());
+ Thread.sleep(30000);
}
- logger.info("Finished, total messages is {}, filter messages is {}",
- SENT_SUCC_COUNTER.get(), filterMsgCount.get());
- messageProducer.producerMap.clear();
- messageProducer.shutdown();
- } catch (TubeClientException e) {
- logger.error("TubeClientException: ", e);
} catch (Throwable e) {
logger.error("Throwable: ", e);
}
- }
-
- public MessageProducer createProducer() throws TubeClientException {
- int index = (producerIndex.incrementAndGet()) % SESSION_FACTORY_NUM;
- return sessionFactoryList.get(index).createProducer();
- }
-
- private void startService() throws TubeClientException {
- for (int i = 0; i < clientCount; i++) {
- PRODUCER_LIST.add(createProducer());
- }
-
- for (MessageProducer producer : PRODUCER_LIST) {
- if (producer != null) {
- producerMap.put(producer, new Sender(producer));
- sendExecutorService.submit(producerMap.get(producer));
- }
- }
- }
-
- public void shutdown() throws Throwable {
+ // clear resources
sendExecutorService.shutdownNow();
- for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
- sessionFactoryList.get(i).shutdown();
+ for (int i = 0; i < sessionFactoryCnt; i++) {
+ sessionFactoryProducerMap.get(i).getF0().shutdown();
}
-
+ msgSendStats.stopStats();
+ logger.info("Sending task is finished, total sent {} messages", totalSentCnt.get());
}
- public class Sender implements Runnable {
- private MessageProducer producer;
+ public static class Sender implements Runnable {
- public Sender(MessageProducer producer) {
+ private final int indexId;
+ private final MessageProducer producer;
+ private final byte[] bodyData;
+ private final long msgCount;
+ private final List<Tuple2<String, String>> topicFilterTuples;
+
+ public Sender(int indexId, MessageProducer producer, byte[] bodyData,
+ List<Tuple2<String, String>> topicFilterTuples, long msgCount) {
+ this.indexId = indexId;
this.producer = producer;
+ this.bodyData = bodyData;
+ this.msgCount = msgCount;
+ this.topicFilterTuples = topicFilterTuples;
}
@Override
public void run() {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- try {
- producer.publish(topicAndFiltersMap.keySet());
- } catch (Throwable t) {
- logger.error("publish exception: ", t);
- }
+ // send message to server
long sentCount = 0;
int roundIndex = 0;
- int targetCnt = topicSendRounds.size();
+ int targetCnt = topicFilterTuples.size();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
while (msgCount < 0 || sentCount < msgCount) {
- long millis = System.currentTimeMillis();
+ // 1 Rotate to get the attribute information to be sent
roundIndex = (int) (sentCount++ % targetCnt);
- Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), sendData);
- message.setAttrKeyVal("index", String.valueOf(sentCount));
- message.setAttrKeyVal("dataTime", String.valueOf(millis));
- if (target.getF1() != null) {
- filterMsgCount.incrementAndGet();
- message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
- }
+ Tuple2<String, String> target = topicFilterTuples.get(roundIndex);
+
+ // 2 send message
try {
- // next line sends message synchronously, which is not recommended
- //producer.sendMessage(message);
- // send message asynchronously, recommended
- producer.sendMessage(message, new DefaultSendCallback());
- } catch (Throwable e1) {
- TOTAL_COUNTER.incrementAndGet();
- SENT_EXCEPT_COUNTER.incrementAndGet();
- logger.error("sendMessage exception: ", e1);
- }
- TOTAL_COUNTER.incrementAndGet();
- // only for test, delay inflight message's count
- if (sentCount % 5000 == 0) {
- ThreadUtils.sleep(3000);
- } else if (sentCount % 4000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2000 == 0) {
- ThreadUtils.sleep(800);
- } else if (sentCount % 1000 == 0) {
- ThreadUtils.sleep(400);
+ // 2.1 Send data asynchronously, recommended
+ producer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+ target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+ // Or
+ // 2.2 Send message synchronous, not recommended
+ // MessageSentResult result = producer.sendMessage(message);
+ // totalSentCnt.incrementAndGet();
+ // if (!result.isSuccess()) {
+ // logger.error("Sync-send message failed!" + result.getErrMsg());
+ // }
+ } catch (TubeClientException | InterruptedException e) {
+ logger.error("Send message failed!", e);
}
+ // 3 Cool sending
+ // Attention: only used in the test link, to solve the problem of
+ // frequent sending failures caused by insufficient test resources.
+ MixedUtils.coolSending(sentCount);
}
try {
producer.shutdown();
} catch (Throwable e) {
logger.error("producer shutdown error: ", e);
}
-
+ logger.info("The message sending task(" + indexId + ") has been completed!");
}
}
- private class DefaultSendCallback implements MessageSentCallback {
+ private static class DefaultSendCallback implements MessageSentCallback {
+
@Override
public void onMessageSent(MessageSentResult result) {
- TOTAL_COUNTER.incrementAndGet();
+ totalSentCnt.incrementAndGet();
if (result.isSuccess()) {
- SENT_SUCC_COUNTER.incrementAndGet();
+ msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
} else {
- SENT_FAIL_COUNTER.incrementAndGet();
+ logger.error("Send message failed!" + result.getErrMsg());
}
}
@Override
public void onException(Throwable e) {
- TOTAL_COUNTER.incrementAndGet();
- SENT_EXCEPT_COUNTER.incrementAndGet();
+ totalSentCnt.incrementAndGet();
logger.error("Send message error!", e);
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
deleted file mode 100644
index f251a67..0000000
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.example;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.tubemq.client.common.PeerInfo;
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
-import org.apache.inlong.tubemq.client.consumer.MessageListener;
-import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This demo shows how to consume message sequentially.
- *
- *Consumer supports subscribe multiple topics in one consume group. Message from subscription
- * sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
- * to perform any blocking operation inside the callback.
- *
- *As for consumption control of {@link PushMessageConsumer}, business logic is able to monitor
- * current state and adjust consumption by
- *
- *<ul>
- * <li>call {@link PushMessageConsumer#pauseConsume()} to pause consumption when high water mark exceeded.</li>
- * <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
- * </ul>
- */
-public final class MessageConsumerExample {
-
- private static final Logger logger =
- LoggerFactory.getLogger(MessageConsumerExample.class);
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
- private final PushMessageConsumer messageConsumer;
- private final MessageSessionFactory messageSessionFactory;
-
- public MessageConsumerExample(String masterHostAndPort,
- String group, int fetchThreadCnt) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- if (fetchThreadCnt > 0) {
- consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
- }
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
- }
-
- public static void main(String[] args) {
- final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int clientCount = Integer.parseInt(args[3]);
- int threadCnt = -1;
- if (args.length > 5) {
- threadCnt = Integer.parseInt(args[4]);
- }
- final int fetchThreadCnt = threadCnt;
- final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- final ExecutorService executorService =
- Executors.newCachedThreadPool();
- for (int i = 0; i < clientCount; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumerExample messageConsumer =
- new MessageConsumerExample(masterServers,
- group, fetchThreadCnt);
- messageConsumer.subscribe(topicAndFiltersMap);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
- }
- final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
- statisticThread.start();
-
- executorService.shutdown();
- try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
- }
- msgRecvStats.stopStats();
- }
-
- public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
- for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- MessageListener messageListener = new DefaultMessageListener(entry.getKey());
- messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
- }
- messageConsumer.completeSubscribe();
- }
-
- public static class DefaultMessageListener implements MessageListener {
-
- private String topic;
-
- public DefaultMessageListener(String topic) {
- this.topic = topic;
- }
-
- @Override
- public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
- if (messages != null && !messages.isEmpty()) {
- msgRecvStats.addMsgCount(this.topic, messages.size());
- }
- }
-
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void stop() {
- }
- }
-}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index 18462c7..afffde7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -17,17 +17,11 @@
package org.apache.inlong.tubemq.example;
-import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -35,9 +29,7 @@ import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,144 +44,95 @@ public final class MessageProducerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessageProducerExample.class);
- private static final ConcurrentHashMap<String, AtomicLong> counterMap =
- new ConcurrentHashMap<>();
+ private static final MsgSendReceiveStats msgSendStats =
+ new MsgSendReceiveStats(true);
- private final MessageProducer messageProducer;
- private final MessageSessionFactory messageSessionFactory;
+ private static MessageSessionFactory sessionFactory;
+ private static MessageProducer messageProducer;
- public MessageProducerExample(String masterServers) throws Exception {
- TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
- this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
- this.messageProducer = messageSessionFactory.createProducer();
- }
-
- public static void main(String[] args) {
- // get and initial parameters
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to publish to.
+ // The 3rd parameter msgCount is the message amount that needs to be sent
+ // The 4th parameter pkgSize is the message's body size that needs to be sent
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
+ final String pubTopicAndFilterItems = args[1];
final long msgCount = Long.parseLong(args[2]);
- final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial send target
- final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
- // initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
- }
- // initial sent data
- String body = "This is a test message from single-session-factory.";
- byte[] bodyBytes = StringUtils.getBytesUtf8(body);
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
- }
- dataBuffer.flip();
- // send messages
- try {
- long sentCount = 0;
- int roundIndex = 0;
- int targetCnt = topicSendRounds.size();
- MessageProducerExample messageProducer =
- new MessageProducerExample(masterServers);
- messageProducer.publishTopics(topicAndFiltersMap.keySet());
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- while (msgCount < 0 || sentCount < msgCount) {
- roundIndex = (int) (sentCount++ % targetCnt);
- Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), dataBuffer.array());
- long currTimeMillis = System.currentTimeMillis();
- message.setAttrKeyVal("index", String.valueOf(sentCount));
- message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
- if (target.getF1() != null) {
- message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
- }
- try {
- // 1.1 next line sends message synchronously, which is not recommended
- // messageProducer.sendMessage(message);
- // 1.2 send message asynchronously, recommended
- messageProducer.sendMessageAsync(message,
- messageProducer.new DefaultSendCallback());
- } catch (Throwable e1) {
- logger.error("Send Message throw exception ", e1);
- }
- // only for test, delay inflight message's count
- if (sentCount % 20000 == 0) {
- ThreadUtils.sleep(4000);
- } else if (sentCount % 10000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2500 == 0) {
- ThreadUtils.sleep(300);
- }
- }
- ThreadUtils.sleep(20000);
- for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
- logger.info(
- "********* Current {} Message sent count is {}",
- entry.getKey(),
- entry.getValue().get()
- );
- }
- } catch (Throwable e) {
- e.printStackTrace();
+ int pkgSize = 1024;
+ if (args.length > 3) {
+ pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
}
- }
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(pubTopicAndFilterItems);
- public void publishTopics(Set<String> topicSet) throws TubeClientException {
- this.messageProducer.publish(topicSet);
- }
+ // 2. initial configure, session factory object, and producer object
+ TubeClientConfig clientConfig =
+ new TubeClientConfig(masterServers);
+ sessionFactory = new TubeSingleSessionFactory(clientConfig);
+ messageProducer = sessionFactory.createProducer();
+ messageProducer.publish(topicAndFiltersMap.keySet());
+
+ // 3. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgSendStats, "Sent Statistic Thread");
+ statisticThread.start();
+
+ // 4. build the content of the message to be sent
+ // include message body, attributes, and time information template
+ final byte[] bodyData =
+ MixedUtils.buildTestData(pkgSize);
+ List<Tuple2<String, String>> buildTopicFilterTuples =
+ MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- /**
- * Send message synchronous.
- */
- public void sendMessage(Message message) {
- // date format is accurate to minute, not to second
- try {
- MessageSentResult result = messageProducer.sendMessage(message);
- if (!result.isSuccess()) {
- logger.error("Sync-send message failed!" + result.getErrMsg());
+ // 5. send message to server
+ long sentCount = 0;
+ int roundIndex = 0;
+ int targetCnt = buildTopicFilterTuples.size();
+ while (msgCount < 0 || sentCount < msgCount) {
+ // 5.1 Rotate to get the attribute information to be sent
+ roundIndex = (int) (sentCount++ % targetCnt);
+ Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
+
+ // 5.2 send message
+ try {
+ // 5.2.1 Send data asynchronously, recommended
+ messageProducer.sendMessage(MixedUtils.buildMessage(target.getF0(),
+ target.getF1(), bodyData, sentCount, sdf), new DefaultSendCallback());
+ // Or
+ // 5.2.2 Send message synchronous, not recommended
+ // MessageSentResult result = messageProducer.sendMessage(message);
+ // if (!result.isSuccess()) {
+ // logger.error("Sync-send message failed!" + result.getErrMsg());
+ // }
+ } catch (TubeClientException | InterruptedException e) {
+ logger.error("Send message failed!", e);
}
- } catch (TubeClientException | InterruptedException e) {
- logger.error("Sync-send message failed!", e);
- }
- }
- /**
- * Send message asynchronous. More efficient and recommended.
- */
- public void sendMessageAsync(Message message, MessageSentCallback callback) {
- try {
- messageProducer.sendMessage(message, callback);
- } catch (TubeClientException | InterruptedException e) {
- logger.error("Async-send message failed!", e);
+ // 5.3 Cool sending
+ // Attention: only used in the test link, to solve the problem of
+ // frequent sending failures caused by insufficient test resources.
+ MixedUtils.coolSending(sentCount);
}
+
+ // 6. Clean up resources and exit the service after the task is completed
+ // Attention: TubeMQ client is suitable for serving as a resident service,
+ // not suitable for creating Producer objects message by message
+ messageProducer.shutdown();
+ sessionFactory.shutdown();
+ msgSendStats.stopStats();
+ logger.info("The message sending task has been completed!");
}
- private class DefaultSendCallback implements MessageSentCallback {
+ private static class DefaultSendCallback implements MessageSentCallback {
@Override
public void onMessageSent(MessageSentResult result) {
if (result.isSuccess()) {
- String topicName = result.getMessage().getTopic();
-
- AtomicLong currCount = counterMap.get(topicName);
- if (currCount == null) {
- AtomicLong tmpCount = new AtomicLong(0);
- currCount = counterMap.putIfAbsent(topicName, tmpCount);
- if (currCount == null) {
- currCount = tmpCount;
- }
- }
- if (currCount.incrementAndGet() % 1000 == 0) {
- logger.info("Send " + topicName + " " + currCount.get() + " message!");
- }
+ msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
} else {
logger.error("Send message failed!" + result.getErrMsg());
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4c243fe..bf51d9e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -18,15 +18,14 @@
package org.apache.inlong.tubemq.example;
import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
+
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
@@ -35,117 +34,155 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This demo shows how to consume message by pull.
+ * This demo shows how to consume messages through TubeSingleSessionFactory + PullMessageConsumer
*
*Consume message in pull mode achieved by {@link PullMessageConsumer#getMessage()}.
* Note that whenever {@link PullMessageConsumer#getMessage()} returns successfully, the
- * return value(whether or not to be {@code null}) should be processed by
+ * return value(whether or not to be {@code null}) must be processed by
* {@link PullMessageConsumer#confirmConsume(String, boolean)}.
*/
public final class MessagePullConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullConsumerExample.class);
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
- private final PullMessageConsumer messagePullConsumer;
- private final MessageSessionFactory messageSessionFactory;
-
- public MessagePullConsumerExample(String masterHostAndPort, String group) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
- }
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
+ private static PullMessageConsumer pullConsumer;
+ private static MessageSessionFactory sessionFactory;
+
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter consumeCount is the amount of messages that need to be consumed
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
public static void main(String[] args) throws Throwable {
- // get and initial parameters
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int msgCount = Integer.parseInt(args[3]);
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
+ int fetchThreadCnt = 3;
+ if (args.length > 4) {
+ fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+ 1, Runtime.getRuntime().availableProcessors());
+ }
final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial consumer object
- final MessagePullConsumerExample messageConsumer =
- new MessagePullConsumerExample(masterServers, group);
- messageConsumer.subscribe(topicAndFiltersMap);
- Thread[] fetchRunners = new Thread[3];
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+ // 2. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterServers, groupName);
+ // 2.1 set consume from latest position if the consumer group is first consume
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2.2 build session factory object
+ // Attention: here we are using the TubeSingleSessionFactory object(a
+ // singleton session factory can only create one object in a process,
+ // requiring all topics to be in one cluster.
+ // If the topics subscribed to by the objects in the process are
+ // in different clusters, then need to use the TubeMultiSessionFactory class,
+ // please refer to the example of TubeMultiSessionFactory usage)
+ sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+ // 3 build consumer object
+ // we can construct multiple consumers after the creation of the session factory object
+ pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+ // 3.1 Set the Topic and the filter item set corresponding to the consumption
+ // if you not need filter consumption,
+ // set the parameter filterConds is null or empty set
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ pullConsumer.subscribe(entry.getKey(), entry.getValue());
+ }
+ // 3.2 start consumption
+ pullConsumer.completeSubscribe();
+
+ // 4. initial fetch threads
+ Thread[] fetchRunners = new Thread[fetchThreadCnt];
for (int i = 0; i < fetchRunners.length; i++) {
- fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
- fetchRunners[i].setName("_fetch_runner_" + i);
+ fetchRunners[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConsumerResult csmResult;
+ int getCount = consumeCount;
+ // wait partition status ready
+ do {
+ if (pullConsumer.isPartitionsReady(5000)
+ || pullConsumer.isShutdown()) {
+ break;
+ }
+ } while (true);
+ // consume messages
+ do {
+ try {
+ // 4.1 judge consumer is shutdown
+ if (pullConsumer.isShutdown()) {
+ logger.warn("Consumer is shutdown!");
+ break;
+ }
+ // 4.2 get messages from server
+ csmResult = pullConsumer.getMessage();
+ if (csmResult.isSuccess()) {
+ // 4.2.1 process message if getMessage() return success
+ List<Message> messageList = csmResult.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ }
+ // 4.2.1.1 confirm consume result
+ // Notice:
+ // 1. If the processing fails, the parameter isConsumed can
+ // be set to false, but this is likely to cause
+ // an infinite loop of data consumption, so
+ // it is strongly recommended to set this parameter
+ // to true when using it, and the failed data can
+ // be processed in other ways
+ // 2. The messageList returned by getMessage() may be empty,
+ // and confirmConsume() is still required call in this case
+ pullConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+ } else {
+ // 4.2.2 process failure when getMessage() return false
+ // Any failure can be ignored
+ if (!IGNORE_ERROR_SET.contains(csmResult.getErrCode())) {
+ logger.debug(
+ "Receive messages errorCode is {}, Error message is {}",
+ csmResult.getErrCode(), csmResult.getErrMsg());
+ }
+ }
+ // 4.3 Determine whether the consumed data reaches the goal
+ if (consumeCount > 0) {
+ if (--getCount <= 0) {
+ logger.info("Consumer has consumed {} messages!", consumeCount);
+ break;
+ }
+ }
+ } catch (Throwable e) {
+ // Any exceptions in running can be ignored
+ }
+ } while (true);
+ logger.info("The fetch thread has exited!");
+ }
+ }, "_fetch_runner_" + i);
}
- // initial fetch threads
+
+ // 5. start fetch threads
for (Thread thread : fetchRunners) {
thread.start();
}
- // initial statistic thread
+
+ // 6. initial and statistic thread
Thread statisticThread =
- new Thread(msgRecvStats, "Sent Statistic Thread");
+ new Thread(msgRcvStats, "Receive Statistic Thread");
statisticThread.start();
- }
-
- public void subscribe(
- Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
- for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
- }
- messagePullConsumer.completeSubscribe();
- }
-
- public ConsumerResult getMessage() throws TubeClientException {
- return messagePullConsumer.getMessage();
- }
-
- public ConsumerResult confirmConsume(final String confirmContext,
- boolean isConsumed) throws TubeClientException {
- return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
- }
-
- public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
- return messagePullConsumer.getCurConsumedPartitions();
- }
- private static class FetchRequestRunner implements Runnable {
-
- final MessagePullConsumerExample messageConsumer;
- final int consumeCount;
-
- FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
- this.messageConsumer = messageConsumer;
- this.consumeCount = msgCount;
- }
-
- @Override
- public void run() {
- try {
- int getCount = consumeCount;
- do {
- ConsumerResult result = messageConsumer.getMessage();
- if (result.isSuccess()) {
- List<Message> messageList = result.getMessageList();
- if (messageList != null && !messageList.isEmpty()) {
- msgRecvStats.addMsgCount(result.getTopicName(), messageList.size());
- }
- messageConsumer.confirmConsume(result.getConfirmContext(), true);
- } else {
- if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
- logger.info(
- "Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(), result.getErrMsg());
- }
- }
- if (consumeCount > 0) {
- if (--getCount <= 0) {
- break;
- }
- }
- } while (true);
- msgRecvStats.stopStats();
- } catch (TubeClientException e) {
- logger.error("Create consumer failed!", e);
- }
- }
+ // 7. Resource cleanup when exiting the service
+ //
+ // 7.1 shutdown consumers
+ // pullConsumer.shutdown();
+ // 7.2 shutdown session factory
+ // sessionFactory.shutdown();
+ // 7.3 shutdown statistic thread
+ // msgRecvStats.stopStats();
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index d50b431..2bc7080 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,29 +17,31 @@
package org.apache.inlong.tubemq.example;
-import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
+ * This demo shows how to consume messages with bootstrap offset through
+ * TubeMultiSessionFactory + PullMessageConsumer
+ *
+ * First, we start a consumer for data consumption, and then we exit it, start another consumer
+ * with the same group name, and set the partition's bootstrap offset to 0 for data consumption.
+ *
+ * The main difference from {@link MessagePullConsumerExample}
* is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
* {@link PullMessageConsumer#completeSubscribe()}. The former supports multiple options to configure
* when to reset offset.
@@ -48,141 +50,214 @@ public final class MessagePullSetConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
- private static final AtomicLong counter = new AtomicLong(0);
-
- private final PullMessageConsumer messagePullConsumer;
- private final MessageSessionFactory messageSessionFactory;
-
- public MessagePullSetConsumerExample(String masterHostAndPort,
- String group) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
- }
+ // Statistic object
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
+ // The map of the master cluster and Multiple session factory
+ // There may be multiple consumers in the same process and the topic sets subscribed
+ // by different consumers are in different clusters. In this case,
+ // we need to construct session factories by TubeMultiSessionFactory class.
+ private static final ConcurrentHashMap<String, MessageSessionFactory> multSessFtyMap =
+ new ConcurrentHashMap<>();
- public static void main(String[] args) {
- // get and initial parameters
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter consumeCount is the amount of messages that need to be consumed
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int msgCount = Integer.parseInt(args[3]);
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
+ int fetchThreadCnt = 3;
+ if (args.length > 4) {
+ fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+ 1, Runtime.getRuntime().availableProcessors());
+ }
final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial reset offset parameters
- // (The offset specified is only a demo)
- final Map<String, Long> partOffsetMap =
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+ // 2. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgRcvStats, "Receive Statistic Thread");
+ statisticThread.start();
+
+ // 3. Start the consumer group for the first consumption
+ // 3.1. build consumer object
+ logger.info("The first consumption begin!");
+ PullMessageConsumer consumer1 =
+ createPullConsumer(masterServers, groupName);
+ // 3.2. Set the Topic and the filter item set corresponding to the consumption
+ // if you not need filter consumption,
+ // set the parameter filterConds is null or empty set
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ consumer1.subscribe(entry.getKey(), entry.getValue());
+ }
+ // 3.3. start consumption with
+ String sessionKeyFst = "test_consume_first";
+ int sourceCountFst = 1;
+ boolean isSelectBig = false;
+ // The map of partitionKey and last success offset
+ // You can persist the information and use it when restarting or
+ // re-rolling to keep the current consumption to start from
+ // the offset required in the last round
+ ConcurrentHashMap<String, Long> partOffsetMapFst =
new ConcurrentHashMap<>();
- partOffsetMap.put("123:test_1:0", 0L);
- partOffsetMap.put("123:test_1:1", 0L);
- partOffsetMap.put("123:test_1:2", 0L);
- partOffsetMap.put("123:test_2:0", 350L);
- partOffsetMap.put("123:test_2:1", 350L);
- partOffsetMap.put("123:test_2:2", 350L);
-
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- int getCount = msgCount;
- MessagePullSetConsumerExample messageConsumer =
- new MessagePullSetConsumerExample(masterServers, group);
- messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
- // main logic of consuming
- do {
- ConsumerResult result = messageConsumer.getMessage();
- if (result.isSuccess()) {
- List<Message> messageList = result.getMessageList();
- if (messageList != null) {
- logger.info("Receive messages:" + counter.addAndGet(messageList.size()));
- }
-
- // Offset returned by GetMessage represents the initial offset of this request
- // if consumer group is pure Pull mode, the initial offset can be saved;
- // if not, we have to use the return value of confirmConsume
- long oldValue = partOffsetMap.get(
- result.getPartitionKey()) == null
- ? -1
- : partOffsetMap.get(result.getPartitionKey());
- partOffsetMap.put(result.getPartitionKey(), result.getCurrOffset());
- logger.info(
- "GetMessage , partitionKey={}, oldValue={}, newVal={}",
- new Object[]{result.getPartitionKey(), oldValue, result.getCurrOffset()});
-
- // save the Offset from the return value of confirmConsume
- ConsumerResult confirmResult = messageConsumer.confirmConsume(
- result.getConfirmContext(),
- true);
- if (confirmResult.isSuccess()) {
- oldValue = partOffsetMap.get(
- result.getPartitionKey()) == null
- ? -1
- : partOffsetMap.get(result.getPartitionKey());
- partOffsetMap.put(result.getPartitionKey(), confirmResult.getCurrOffset());
- logger.info(
- "ConfirmConsume , partitionKey={}, oldValue={}, newVal={}",
- new Object[]{
- confirmResult.getPartitionKey(),
- oldValue,
- confirmResult.getCurrOffset()});
- } else {
- logger.info(
- "ConfirmConsume failure, errCode is {}, errInfo is {}.",
- confirmResult.getErrCode(),
- confirmResult.getErrMsg());
- }
- } else {
- if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
- logger.info(
- "Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(), result.getErrMsg());
- }
- }
- if (msgCount >= 0) {
- if (--getCount <= 0) {
- break;
- }
- }
- } while (true);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
+ consumer1.completeSubscribe(sessionKeyFst,
+ sourceCountFst, isSelectBig, partOffsetMapFst);
- executorService.shutdown();
- try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
+ // 3.4. initial and start fetch threads
+ Thread[] fetchRunners1 = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchRunners1.length; i++) {
+ fetchRunners1[i] = new Thread(new FetchRequestRunner(
+ consumer1, partOffsetMapFst, consumeCount), "_fetch_runner_" + i);
+ }
+ for (Thread thread : fetchRunners1) {
+ thread.start();
+ }
+
+ // 3.5. wait consume data
+ ThreadUtils.sleep(2 * 60 * 1000);
+ logger.info("The first consumption has finished!");
+
+ // 3.6. shutdown consumer
+ consumer1.shutdown();
+ for (Thread thread : fetchRunners1) {
+ thread.join();
}
- }
- public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
- Map<String, Long> partOffsetMap) throws TubeClientException {
+ // 4. Start the consumer group for the second consumption
+ // 4.1 set the boostrap Offset, here we set consumption from 0
+ logger.info("The second consumption begin!");
+ String sessionKeySec = "test_consume_Second";
+ int sourceCountSec = 1;
+ ConcurrentHashMap<String, Long> partOffsetMapSec =
+ new ConcurrentHashMap<>();
+ for (String partKey : partOffsetMapFst.keySet()) {
+ partOffsetMapSec.put(partKey, 0L);
+ }
+ // 4.2. build consumer object
+ PullMessageConsumer consumer2 =
+ createPullConsumer(masterServers, groupName);
+ // 4.3. Set the Topic and the filter item set corresponding to the consumption
for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
+ consumer2.subscribe(entry.getKey(), entry.getValue());
}
- String sessionKey = "test_reset2";
- int consumerCount = 2;
- boolean isSelectBig = false;
- messagePullConsumer.completeSubscribe(sessionKey,
- consumerCount, isSelectBig, partOffsetMap);
- }
+ consumer2.completeSubscribe(sessionKeySec,
+ sourceCountSec, isSelectBig, partOffsetMapSec);
- public ConsumerResult getMessage() throws TubeClientException {
- return messagePullConsumer.getMessage();
+ // 4.4. initial and start fetch threads
+ Thread[] fetchRunners2 = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchRunners2.length; i++) {
+ fetchRunners2[i] = new Thread(new FetchRequestRunner(
+ consumer2, partOffsetMapSec, consumeCount), "_fetch_runner_" + i);
+ }
+ for (Thread thread : fetchRunners2) {
+ thread.start();
+ }
}
- public ConsumerResult confirmConsume(
- String confirmContext,
- boolean isConsumed
- ) throws TubeClientException {
- return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
+ // consumer object creation function
+ private static PullMessageConsumer createPullConsumer(
+ String masterHostAndPorts, String groupName) throws Exception {
+ // 1. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterHostAndPorts, groupName);
+ // 1.2 set consume from latest position if the consumer group is first consume
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2. build session factory object
+ // find and initialize TubeMultiSessionFactory object according to the Master cluster information
+ MasterInfo masterInfo = consumerConfig.getMasterInfo();
+ MessageSessionFactory sessionFactory =
+ multSessFtyMap.get(masterInfo.getMasterClusterStr());
+ if (sessionFactory == null) {
+ MessageSessionFactory tmpSessionFactory =
+ new TubeMultiSessionFactory(consumerConfig);
+ sessionFactory = multSessFtyMap.putIfAbsent(
+ masterInfo.getMasterClusterStr(), tmpSessionFactory);
+ if (sessionFactory == null) {
+ sessionFactory = tmpSessionFactory;
+ } else {
+ tmpSessionFactory.shutdown();
+ }
+ }
+ // 3. Create and get the PullMessageConsumer object
+ return sessionFactory.createPullConsumer(consumerConfig);
}
- public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
- return messagePullConsumer.getCurConsumedPartitions();
+ // fetch message runner
+ private static class FetchRequestRunner implements Runnable {
+
+ final PullMessageConsumer pullConsumer;
+ final ConcurrentHashMap<String, Long> partOffsetMap;
+ final int consumeCount;
+
+ FetchRequestRunner(PullMessageConsumer messageConsumer,
+ ConcurrentHashMap<String, Long> partOffsetMap,
+ int msgCount) {
+ this.pullConsumer = messageConsumer;
+ this.partOffsetMap = partOffsetMap;
+ this.consumeCount = msgCount;
+ }
+
+ @Override
+ public void run() {
+ ConsumerResult csmResult;
+ ConsumerResult cfmResult;
+ int getCount = consumeCount;
+ // wait partition status ready
+ do {
+ if (pullConsumer.isPartitionsReady(5000)
+ || pullConsumer.isShutdown()) {
+ break;
+ }
+ } while (true);
+ // consume messages
+ do {
+ try {
+ // 1 judge consumer is shutdown
+ if (pullConsumer.isShutdown()) {
+ logger.warn("Consumer is shutdown!");
+ break;
+ }
+ // 2 get messages from server
+ csmResult = pullConsumer.getMessage();
+ if (csmResult.isSuccess()) {
+ // 2.1 process message if getMessage() return success
+ List<Message> messageList = csmResult.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ }
+ // 2.1.2 store the offset of processing message
+ // the offset returned by GetMessage() represents the initial offset of this request
+ // if consumer group is pure Pull mode, the initial offset can be saved;
+ // if not, we have to use the return value of confirmConsume()
+ partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+ // 2.1.3 confirm consume result
+ cfmResult = pullConsumer.confirmConsume(
+ csmResult.getConfirmContext(), true);
+ if (cfmResult.isSuccess()) {
+ // store confirmed offset value
+ partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+ }
+ }
+ // 3. Determine whether the consumed data reaches the goal
+ if (consumeCount > 0) {
+ if (--getCount <= 0) {
+ logger.info("Consumer has consumed {} messages!", consumeCount);
+ break;
+ }
+ }
+ } catch (Throwable e) {
+ // Any exceptions in running can be ignored
+ }
+ } while (true);
+ logger.info("The fetch thread has exited!");
+ }
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
new file mode 100644
index 0000000..94c8d15
--- /dev/null
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.example;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+
+/**
+ * This demo shows how to consume messages sequentially in Push mode.
+ *
+ *Consumer supports subscribe multiple topics in one consume group. Message from subscription
+ * sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
+ * to perform any blocking operation inside the callback.
+ *
+ *As for consumption control of {@link PushMessageConsumer}, business logic is able to monitor
+ * current state and adjust consumption by
+ *
+ *<ul>
+ * <li>call {@link PushMessageConsumer#pauseConsume()} to pause consumption when high water mark exceeded.</li>
+ * <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
+ * </ul>
+ */
+public final class MessagePushConsumerExample {
+
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
+ private static MessageSessionFactory sessionFactory;
+ private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
+
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter clientCount is the amount of consumer
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
+ final String masterServers = args[0];
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ int clientCount = Integer.parseInt(args[3]);
+ if (clientCount <= 0) {
+ clientCount = 1;
+ }
+ int paraFetchThreadCnt = -1;
+ if (args.length > 5) {
+ paraFetchThreadCnt = Integer.parseInt(args[4]);
+ }
+ final int fetchThreadCnt = paraFetchThreadCnt;
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+ // 2. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterServers, groupName);
+ // 2.1. set consume from latest position if the consumer group is first consume
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2.2. set the fetch thread count of push consumer
+ if (fetchThreadCnt > 0) {
+ consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
+ }
+ // 2.3. build session factory object
+ sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+ // 3. build and start consumer object
+ for (int i = 0; i < clientCount; i++) {
+ // 3.1. build and start consumer object
+ PushMessageConsumer consumer =
+ sessionFactory.createPushConsumer(consumerConfig);
+ // 3.2. set subscribed topic and Listener
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ MessageListener messageListener = new DefaultMessageListener(entry.getKey());
+ consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
+ }
+ // 3.3 start consumer
+ consumer.completeSubscribe();
+ // 3.4 store consumer object
+ consumerMap.put(consumer.getConsumerId(), consumer);
+ }
+
+ // 4. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgRcvStats, "Receive Statistic Thread");
+ statisticThread.start();
+
+ // 5. Resource cleanup when exiting the service
+ //
+ // 5.1 shutdown consumers
+ // for (PushMessageConsumer consumer : consumerMap.values()) {
+ // consumer.shutdown();
+ // }
+ // 5.2 shutdown session factory
+ // sessionFactory.shutdown();
+ // 5.3 shutdown statistic thread
+ // msgRecvStats.stopStats();
+ }
+
+ // Message callback processing class.
+ // After the SDK receives the message, it will pass the message back to the business layer
+ // for message processing by calling the receiveMessages() API of this class
+ public static class DefaultMessageListener implements MessageListener {
+
+ private final String topic;
+
+ public DefaultMessageListener(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
+ if (messages != null && !messages.isEmpty()) {
+ msgRcvStats.addMsgCount(this.topic, messages.size());
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
similarity index 69%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
index 9074026..cfc8a55 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
@@ -28,19 +28,24 @@ import org.slf4j.LoggerFactory;
/**
* This demo shows how to collect and report message received statistics.
*/
-public class MsgRecvStats implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(MsgRecvStats.class);
+public class MsgSendReceiveStats implements Runnable {
+ private final boolean isProducer;
+ private static final Logger logger = LoggerFactory.getLogger(MsgSendReceiveStats.class);
private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
- private AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final AtomicBoolean isStarted = new AtomicBoolean(true);
+
+ public MsgSendReceiveStats(boolean isProducer) {
+ this.isProducer = isProducer;
+ }
@Override
public void run() {
+ // Indicator output every 30 seconds
while (isStarted.get()) {
try {
for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
long currCount = entry.getValue().get();
-
AtomicLong befCount = befCountMap.get(entry.getKey());
if (befCount == null) {
AtomicLong tmpCount = new AtomicLong(0);
@@ -49,10 +54,14 @@ public class MsgRecvStats implements Runnable {
befCount = tmpCount;
}
}
-
- logger.info("********* Current {} Message receive count is {}, dlt is {}",
- new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
-
+ if (isProducer) {
+ logger.info("********* Current {} Message sent count is {}, dlt is {}",
+ new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+ } else {
+ logger.info("********* Current {} Message received count is {}, dlt is {}",
+ new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+ }
+ befCountMap.get(entry.getKey()).set(currCount);
}
} catch (Throwable t) {
// ignore
@@ -71,9 +80,13 @@ public class MsgRecvStats implements Runnable {
currCount = tmpCount;
}
}
-
- if (currCount.addAndGet(msgCnt) % 500 == 0) {
- logger.info("Receive messages:" + currCount.get());
+ // Indicator output every 1000
+ if (currCount.addAndGet(msgCnt) % 1000 == 0) {
+ if (isProducer) {
+ logger.info("Sent " + topicName + " messages:" + currCount.get());
+ } else {
+ logger.info("Received " + topicName + " messages:" + currCount.get());
+ }
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index fbf19fd..0ac63a5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.tools.cli;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@ public class CliProducer extends CliAbstractBase {
// sent data content
private static byte[] sentData;
private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
- private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+ private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
// cli parameters
@@ -181,21 +180,13 @@ public class CliProducer extends CliAbstractBase {
// initial tubemq client order by caller required
public void initTask() throws Exception {
- // initial sent data
- sentData = MixedUtils.buildTestData(msgDataSize);
// initial client configure
TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+ // initial sent data
+ sentData = MixedUtils.buildTestData(msgDataSize);
// initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
- }
+ topicSendRounds = MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
// initial send thread service
sendExecutorService =
Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
@@ -265,13 +256,9 @@ public class CliProducer extends CliAbstractBase {
while (msgCount < 0 || sentCount < msgCount) {
roundIndex = (int) (sentCount++ % topicAndCondCnt);
try {
- long millis = System.currentTimeMillis();
Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), sentData);
- if (target.getF1() != null) {
- // if include filter, add filter item
- message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
- }
+ Message message = MixedUtils.buildMessage(
+ target.getF0(), target.getF1(), sentData, sentCount, sdf);
// use sync or async process
if (syncProduction) {
MessageSentResult procResult =
@@ -293,15 +280,7 @@ public class CliProducer extends CliAbstractBase {
// Limit sending flow control to avoid frequent errors
// caused by too many inflight messages being sent
if (!withoutDelay) {
- if (sentCount % 5000 == 0) {
- ThreadUtils.sleep(3000);
- } else if (sentCount % 4000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2000 == 0) {
- ThreadUtils.sleep(800);
- } else if (sentCount % 1000 == 0) {
- ThreadUtils.sleep(400);
- }
+ MixedUtils.coolSending(sentCount);
}
}
// finished, close client