You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/05 12:26:59 UTC
[incubator-inlong] branch INLONG-1739 updated: [INLONG-1758]
Optimize the realization of class MessagePullSetConsumerExample (#1759)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new 8b3e625 [INLONG-1758] Optimize the realization of class MessagePullSetConsumerExample (#1759)
8b3e625 is described below
commit 8b3e625f4aa54c837bf817068745744be497e3ca
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Nov 5 20:26:44 2021 +0800
[INLONG-1758] Optimize the realization of class MessagePullSetConsumerExample (#1759)
---
.../tubemq/client/consumer/MessageConsumer.java | 32 ++
.../tubemq/example/MessagePullConsumerExample.java | 8 +
.../example/MessagePullSetConsumerExample.java | 334 +++++++++++++--------
.../tubemq/example/MessagePushConsumerExample.java | 8 +-
4 files changed, 248 insertions(+), 134 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
index 2f3a1ce..df98490 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
@@ -35,6 +35,11 @@ public interface MessageConsumer extends Shutdownable {
boolean isFilterConsume(String topic);
+ /**
+ * Get consume offset information of the current registered partitions
+ *
+ * @return consume offset information
+ */
Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException;
/**
@@ -73,8 +78,35 @@ public interface MessageConsumer extends Shutdownable {
*/
Map<String, Long> getFrozenPartInfo();
+ /**
+ * Start consume messages with default setting
+ */
void completeSubscribe() throws TubeClientException;
+ /**
+ * Start consumption with the precise Offset settings
+ *
+ * The parameter sessionKey is specified by the caller, similar to the JobID in Flink,
+ * which is used to identify the unrelated offset reset consumption activities before and after.
+ * Each reset operation needs to ensure that it is different from the last reset carried sessionKey;
+ *
+ * The parameter sourceCount is used to inform the server how many consumers will consume
+ * in this round of consumer group activation, and the client will not consume data until
+ * the consumer group has not reached the specified number of consumers.
+ *
+ * The parameter isSelectBig is used to inform the server that if multiple clients reset
+ * the offset to the same partition, the server will use the largest offset
+ * or the smallest offset as the standard;
+ *
+ * The parameter partOffsetMap is used to inform the server that this consumption expects
+ * the partitions in the Map to be consumed according to the specified offset value.
+ * The offset in the Map comes from the consumer's query from the server, or the content
+ * returned when the consumer successfully consumes the data before, including push
+ * the PearInfo object returned by the callback function during consumption and
+ * the PearInfo object in the ConsumerResult class during Pull consumption.
+ * The Key in the Map is the partitionKey carried in PearInfo, and the value is
+ * the currOffset value carried in PearInfo.
+ */
void completeSubscribe(String sessionKey,
int sourceCount,
boolean isSelectBig,
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 0b2d71e..4e9674b 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -105,6 +105,14 @@ public final class MessagePullConsumerExample {
public void run() {
ConsumerResult csmResult;
int getCount = consumeCount;
+ // wait partition status ready
+ do {
+ if (pullConsumer.isPartitionsReady(5000)
+ || pullConsumer.isShutdown()) {
+ break;
+ }
+ } while (true);
+ // consume messages
do {
try {
// 4.1 judge consumer is shutdown
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index d50b431..2654e75 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,29 +17,31 @@
package org.apache.inlong.tubemq.example;
-import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
+ * This demo shows how to consume messages with bootstrap offset through
+ * TubeMultiSessionFactory + PullMessageConsumer
+ *
+ * First, we start a consumer for data consumption, and then we exit it, start another consumer
+ * with the same group name, and set the partition's bootstrap offset to 0 for data consumption.
+ *
+ * The main difference from {@link MessagePullConsumerExample}
* is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
* {@link PullMessageConsumer#completeSubscribe()}. The former supports multiple options to configure
* when to reset offset.
@@ -48,141 +50,213 @@ public final class MessagePullSetConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
- private static final AtomicLong counter = new AtomicLong(0);
-
- private final PullMessageConsumer messagePullConsumer;
- private final MessageSessionFactory messageSessionFactory;
-
- public MessagePullSetConsumerExample(String masterHostAndPort,
- String group) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
- }
+ // Statistic object
+ private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ // The map of the master cluster and Multiple session factory
+ // There may be multiple consumers in the same process and the topic sets subscribed
+ // by different consumers are in different clusters. In this case,
+ // we need to construct session factories by TubeMultiSessionFactory class.
+ private static final ConcurrentHashMap<String, MessageSessionFactory> multSessFtyMap =
+ new ConcurrentHashMap<>();
- public static void main(String[] args) {
- // get and initial parameters
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter consumeCount is the amount of messages that need to be consumed
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int msgCount = Integer.parseInt(args[3]);
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
+ int fetchThreadCnt = 3;
+ if (args.length > 4) {
+ fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+ 1, Runtime.getRuntime().availableProcessors());
+ }
final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial reset offset parameters
- // (The offset specified is only a demo)
- final Map<String, Long> partOffsetMap =
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+ // 2. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgRecvStats, "Sent Statistic Thread");
+ statisticThread.start();
+
+ // 3. Start the consumer group for the first consumption
+ // 3.1. build consumer object
+ logger.info("The first consumption begin!");
+ PullMessageConsumer consumer1 =
+ createPullConsumer(masterServers, groupName);
+ // 3.2. Set the Topic and the filter item set corresponding to the consumption
+ // if you not need filter consumption,
+ // set the parameter filterConds is null or empty set
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ consumer1.subscribe(entry.getKey(), entry.getValue());
+ }
+ // 3.3. start consumption with
+ String sessionKeyFst = "test_consume_first";
+ int sourceCountFst = 1;
+ boolean isSelectBig = false;
+ // The map of partitionKey and last success offset
+ // You can persist the information and use it when restarting or
+ // re-rolling to keep the current consumption to start from
+ // the offset required in the last round
+ ConcurrentHashMap<String, Long> partOffsetMapFst =
new ConcurrentHashMap<>();
- partOffsetMap.put("123:test_1:0", 0L);
- partOffsetMap.put("123:test_1:1", 0L);
- partOffsetMap.put("123:test_1:2", 0L);
- partOffsetMap.put("123:test_2:0", 350L);
- partOffsetMap.put("123:test_2:1", 350L);
- partOffsetMap.put("123:test_2:2", 350L);
-
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- int getCount = msgCount;
- MessagePullSetConsumerExample messageConsumer =
- new MessagePullSetConsumerExample(masterServers, group);
- messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
- // main logic of consuming
- do {
- ConsumerResult result = messageConsumer.getMessage();
- if (result.isSuccess()) {
- List<Message> messageList = result.getMessageList();
- if (messageList != null) {
- logger.info("Receive messages:" + counter.addAndGet(messageList.size()));
- }
-
- // Offset returned by GetMessage represents the initial offset of this request
- // if consumer group is pure Pull mode, the initial offset can be saved;
- // if not, we have to use the return value of confirmConsume
- long oldValue = partOffsetMap.get(
- result.getPartitionKey()) == null
- ? -1
- : partOffsetMap.get(result.getPartitionKey());
- partOffsetMap.put(result.getPartitionKey(), result.getCurrOffset());
- logger.info(
- "GetMessage , partitionKey={}, oldValue={}, newVal={}",
- new Object[]{result.getPartitionKey(), oldValue, result.getCurrOffset()});
-
- // save the Offset from the return value of confirmConsume
- ConsumerResult confirmResult = messageConsumer.confirmConsume(
- result.getConfirmContext(),
- true);
- if (confirmResult.isSuccess()) {
- oldValue = partOffsetMap.get(
- result.getPartitionKey()) == null
- ? -1
- : partOffsetMap.get(result.getPartitionKey());
- partOffsetMap.put(result.getPartitionKey(), confirmResult.getCurrOffset());
- logger.info(
- "ConfirmConsume , partitionKey={}, oldValue={}, newVal={}",
- new Object[]{
- confirmResult.getPartitionKey(),
- oldValue,
- confirmResult.getCurrOffset()});
- } else {
- logger.info(
- "ConfirmConsume failure, errCode is {}, errInfo is {}.",
- confirmResult.getErrCode(),
- confirmResult.getErrMsg());
- }
- } else {
- if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
- logger.info(
- "Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(), result.getErrMsg());
- }
- }
- if (msgCount >= 0) {
- if (--getCount <= 0) {
- break;
- }
- }
- } while (true);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
+ consumer1.completeSubscribe(sessionKeyFst,
+ sourceCountFst, isSelectBig, partOffsetMapFst);
- executorService.shutdown();
- try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
+ // 3.4. initial and start fetch threads
+ Thread[] fetchRunners1 = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchRunners1.length; i++) {
+ fetchRunners1[i] = new Thread(new FetchRequestRunner(
+ consumer1, partOffsetMapFst, consumeCount), "_fetch_runner_" + i);
+ }
+ for (Thread thread : fetchRunners1) {
+ thread.start();
+ }
+
+ // 3.5. wait consume data
+ ThreadUtils.sleep(2 * 60 * 1000);
+ logger.info("The first consumption has finished!");
+
+ // 3.6. shutdown consumer
+ consumer1.shutdown();
+ for (Thread thread : fetchRunners1) {
+ thread.join();
}
- }
- public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
- Map<String, Long> partOffsetMap) throws TubeClientException {
+ logger.info("The second consumption begin!");
+ // 4. Start the consumer group for the second consumption
+ // 4.1 set the boostrap Offset, here we set consumption from 0
+ String sessionKeySec = "test_consume_Second";
+ int sourceCountSec = 1;
+ ConcurrentHashMap<String, Long> partOffsetMapSec =
+ new ConcurrentHashMap<>();
+ for (String partKey : partOffsetMapFst.keySet()) {
+ partOffsetMapSec.put(partKey, 0L);
+ }
+ // 4.2. build consumer object
+ PullMessageConsumer consumer2 =
+ createPullConsumer(masterServers, groupName);
+ // 4.3. Set the Topic and the filter item set corresponding to the consumption
for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
+ consumer2.subscribe(entry.getKey(), entry.getValue());
}
- String sessionKey = "test_reset2";
- int consumerCount = 2;
- boolean isSelectBig = false;
- messagePullConsumer.completeSubscribe(sessionKey,
- consumerCount, isSelectBig, partOffsetMap);
- }
+ consumer2.completeSubscribe(sessionKeySec,
+ sourceCountSec, isSelectBig, partOffsetMapSec);
- public ConsumerResult getMessage() throws TubeClientException {
- return messagePullConsumer.getMessage();
+ // 4.4. initial and start fetch threads
+ Thread[] fetchRunners2 = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchRunners2.length; i++) {
+ fetchRunners2[i] = new Thread(new FetchRequestRunner(
+ consumer2, partOffsetMapSec, consumeCount), "_fetch_runner_" + i);
+ }
+ for (Thread thread : fetchRunners2) {
+ thread.start();
+ }
}
- public ConsumerResult confirmConsume(
- String confirmContext,
- boolean isConsumed
- ) throws TubeClientException {
- return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
+ // consumer object creation function
+ private static PullMessageConsumer createPullConsumer(
+ String masterHostAndPorts, String groupName) throws Exception {
+ // 1. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterHostAndPorts, groupName);
+ // 1.2 set consume from latest position if the consumer group is first consume
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2. build session factory object
+ // find and initialize TubeMultiSessionFactory object according to the Master cluster information
+ MasterInfo masterInfo = consumerConfig.getMasterInfo();
+ MessageSessionFactory sessionFactory =
+ multSessFtyMap.get(masterInfo.getMasterClusterStr());
+ if (sessionFactory == null) {
+ MessageSessionFactory tmpSessionFactory =
+ new TubeMultiSessionFactory(consumerConfig);
+ sessionFactory = multSessFtyMap.putIfAbsent(
+ masterInfo.getMasterClusterStr(), tmpSessionFactory);
+ if (sessionFactory == null) {
+ sessionFactory = tmpSessionFactory;
+ } else {
+ tmpSessionFactory.shutdown();
+ }
+ }
+ // 3. Create and get the PullMessageConsumer object
+ return sessionFactory.createPullConsumer(consumerConfig);
}
- public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
- return messagePullConsumer.getCurConsumedPartitions();
+ // fetch message runner
+ private static class FetchRequestRunner implements Runnable {
+
+ final PullMessageConsumer pullConsumer;
+ final ConcurrentHashMap<String, Long> partOffsetMap;
+ final int consumeCount;
+
+ FetchRequestRunner(PullMessageConsumer messageConsumer,
+ ConcurrentHashMap<String, Long> partOffsetMap,
+ int msgCount) {
+ this.pullConsumer = messageConsumer;
+ this.partOffsetMap = partOffsetMap;
+ this.consumeCount = msgCount;
+ }
+
+ @Override
+ public void run() {
+ ConsumerResult csmResult;
+ ConsumerResult cfmResult;
+ int getCount = consumeCount;
+ // wait partition status ready
+ do {
+ if (pullConsumer.isPartitionsReady(5000)
+ || pullConsumer.isShutdown()) {
+ break;
+ }
+ } while (true);
+ // consume messages
+ do {
+ try {
+ // 1 judge consumer is shutdown
+ if (pullConsumer.isShutdown()) {
+ logger.warn("Consumer is shutdown!");
+ break;
+ }
+ // 2 get messages from server
+ csmResult = pullConsumer.getMessage();
+ if (csmResult.isSuccess()) {
+ // 2.1 process message if getMessage() return success
+ List<Message> messageList = csmResult.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ }
+ // 2.1.2 store the offset of processing message
+ // the offset returned by GetMessage() represents the initial offset of this request
+ // if consumer group is pure Pull mode, the initial offset can be saved;
+ // if not, we have to use the return value of confirmConsume()
+ partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+ // 2.1.3 confirm consume result
+ cfmResult = pullConsumer.confirmConsume(
+ csmResult.getConfirmContext(), true);
+ if (cfmResult.isSuccess()) {
+ // store confirmed offset value
+ partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+ }
+ }
+ // 3. Determine whether the consumed data reaches the goal
+ if (consumeCount > 0) {
+ if (--getCount <= 0) {
+ logger.info("Consumer has consumed {} messages!", consumeCount);
+ break;
+ }
+ }
+ } catch (Throwable e) {
+ // Any exceptions in running can be ignored
+ }
+ } while (true);
+ logger.info("The fetch thread has exited!");
+ }
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index 0bdc5c7..0db4acd 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -91,17 +91,17 @@ public final class MessagePushConsumerExample {
// 3. build and start consumer object
for (int i = 0; i < clientCount; i++) {
- // 2. build and start consumer object
+ // 3.1. build and start consumer object
PushMessageConsumer consumer =
sessionFactory.createPushConsumer(consumerConfig);
- // 2.1 set subscribed topic and Listener
+ // 3.2. set subscribed topic and Listener
for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
MessageListener messageListener = new DefaultMessageListener(entry.getKey());
consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
}
- // 2.2 start consumer
+ // 3.3. start consumer
consumer.completeSubscribe();
- // 2.3 store consumer object
+ // 3.4. store consumer object
consumerMap.put(consumer.getConsumerId(), consumer);
}