You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 09:10:25 UTC
[incubator-inlong] branch INLONG-1739 updated:
[INLONG-1748][Improve] Optimize the realization of class
MessagePullConsumerExample (#1749)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new ebb2c7c [INLONG-1748][Improve] Optimize the realization of class MessagePullConsumerExample (#1749)
ebb2c7c is described below
commit ebb2c7c6363f61c3c5cd0b47a49abd0c7db49ae1
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Nov 4 17:08:01 2021 +0800
[INLONG-1748][Improve] Optimize the realization of class MessagePullConsumerExample (#1749)
---
.../tubemq/example/MessagePullConsumerExample.java | 202 +++++++++++----------
.../apache/inlong/tubemq/example/MsgRecvStats.java | 21 +--
2 files changed, 121 insertions(+), 102 deletions(-)
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4c243fe..958b749 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -18,15 +18,14 @@
package org.apache.inlong.tubemq.example;
import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
+
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
@@ -35,117 +34,138 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This demo shows how to consume message by pull.
+ * This demo shows how to consume messages through TubeSingleSessionFactory + PullMessageConsumer
*
*Consume message in pull mode achieved by {@link PullMessageConsumer#getMessage()}.
* Note that whenever {@link PullMessageConsumer#getMessage()} returns successfully, the
- * return value(whether or not to be {@code null}) should be processed by
+ * return value(whether or not to be {@code null}) must be processed by
* {@link PullMessageConsumer#confirmConsume(String, boolean)}.
*/
public final class MessagePullConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullConsumerExample.class);
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
- private final PullMessageConsumer messagePullConsumer;
- private final MessageSessionFactory messageSessionFactory;
-
- public MessagePullConsumerExample(String masterHostAndPort, String group) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
- }
+ private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ private static PullMessageConsumer pullConsumer;
+ private static MessageSessionFactory sessionFactory;
+
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter consumeCount is the amount of messages that need to be consumed
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
public static void main(String[] args) throws Throwable {
- // get and initial parameters
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int msgCount = Integer.parseInt(args[3]);
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
+ int fetchThreadCnt = 3;
+ if (args.length > 4) {
+ fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+ 1, Runtime.getRuntime().availableProcessors());
+ }
final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial consumer object
- final MessagePullConsumerExample messageConsumer =
- new MessagePullConsumerExample(masterServers, group);
- messageConsumer.subscribe(topicAndFiltersMap);
- Thread[] fetchRunners = new Thread[3];
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+ // 2. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterServers, groupName);
+ // 2.1 set consume from latest position if the consumer group is first consume
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2.2 build session factory object
+ // Attention: here we are using the TubeSingleSessionFactory object(a
+ // singleton session factory can only create one object in a process,
+ // requiring all topics to be in one cluster.
+ // If the topics subscribed to by the objects in the process are
+ // in different clusters, then need to use the TubeMultiSessionFactory class,
+ // please refer to the example of TubeMultiSessionFactory usage)
+ sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+ // 3 build consumer object
+ // we can construct multiple consumers after the creation of the session factory object
+ pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+ // 3.1 Set the Topic and the filter item set corresponding to the consumption
+ // if you not need filter consumption,
+ // set the parameter filterConds is null or empty set
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ pullConsumer.subscribe(entry.getKey(), entry.getValue());
+ }
+ // 3.2 start consumption
+ pullConsumer.completeSubscribe();
+
+ // 4. initial fetch threads
+ Thread[] fetchRunners = new Thread[fetchThreadCnt];
for (int i = 0; i < fetchRunners.length; i++) {
- fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
- fetchRunners[i].setName("_fetch_runner_" + i);
+ fetchRunners[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConsumerResult csmResult;
+ int getCount = consumeCount;
+ do {
+ try {
+ // 4.1 judge consumer is shutdown
+ if (pullConsumer.isShutdown()) {
+ logger.warn("Consumer is shutdown!");
+ break;
+ }
+ // 4.2 get messages from server
+ csmResult = pullConsumer.getMessage();
+ if (csmResult.isSuccess()) {
+ // 4.2.1 process message if getMessage() return success
+ List<Message> messageList = csmResult.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ }
+ // 4.2.1.1 confirm consume result
+ // Notice:
+ // 1. If the processing fails, the parameter isConsumed can
+ // be set to false, but this is likely to cause
+ // an infinite loop of data consumption, so
+ // it is strongly recommended to set this parameter
+ // to true when using it, and the failed data can
+ // be processed in other ways
+ // 2. The messageList returned by getMessage() may be empty,
+ // and confirmConsume() is still required call in this case
+ pullConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+ } else {
+ // 4.2.2 process failure when getMessage() return false
+ // Any failure can be ignored
+ if (!IGNORE_ERROR_SET.contains(csmResult.getErrCode())) {
+ logger.debug(
+ "Receive messages errorCode is {}, Error message is {}",
+ csmResult.getErrCode(), csmResult.getErrMsg());
+ }
+ }
+ // 4.3 Determine whether the consumed data reaches the goal
+ if (consumeCount > 0) {
+ if (--getCount <= 0) {
+ logger.info("Consumer has consumed {} messages!", consumeCount);
+ break;
+ }
+ }
+ } catch (Throwable e) {
+ // Any exceptions in running can be ignored
+ }
+ } while (true);
+ logger.info("The fetch thread has exited!");
+ }
+ }, "_fetch_runner_" + i);
}
- // initial fetch threads
+
+ // 5. start fetch threads
for (Thread thread : fetchRunners) {
thread.start();
}
- // initial statistic thread
+
+ // 6. initial and statistic thread
Thread statisticThread =
new Thread(msgRecvStats, "Sent Statistic Thread");
statisticThread.start();
}
- public void subscribe(
- Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
- for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
- }
- messagePullConsumer.completeSubscribe();
- }
-
- public ConsumerResult getMessage() throws TubeClientException {
- return messagePullConsumer.getMessage();
- }
-
- public ConsumerResult confirmConsume(final String confirmContext,
- boolean isConsumed) throws TubeClientException {
- return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
- }
-
- public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
- return messagePullConsumer.getCurConsumedPartitions();
- }
-
- private static class FetchRequestRunner implements Runnable {
-
- final MessagePullConsumerExample messageConsumer;
- final int consumeCount;
-
- FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
- this.messageConsumer = messageConsumer;
- this.consumeCount = msgCount;
- }
-
- @Override
- public void run() {
- try {
- int getCount = consumeCount;
- do {
- ConsumerResult result = messageConsumer.getMessage();
- if (result.isSuccess()) {
- List<Message> messageList = result.getMessageList();
- if (messageList != null && !messageList.isEmpty()) {
- msgRecvStats.addMsgCount(result.getTopicName(), messageList.size());
- }
- messageConsumer.confirmConsume(result.getConfirmContext(), true);
- } else {
- if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
- logger.info(
- "Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(), result.getErrMsg());
- }
- }
- if (consumeCount > 0) {
- if (--getCount <= 0) {
- break;
- }
- }
- } while (true);
- msgRecvStats.stopStats();
- } catch (TubeClientException e) {
- logger.error("Create consumer failed!", e);
- }
- }
- }
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
index 9074026..f418052 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
@@ -29,10 +29,13 @@ import org.slf4j.LoggerFactory;
* This demo shows how to collect and report message received statistics.
*/
public class MsgRecvStats implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(MsgRecvStats.class);
- private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
- private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
- private AtomicBoolean isStarted = new AtomicBoolean(false);
+ private static final Logger logger =
+ LoggerFactory.getLogger(MsgRecvStats.class);
+ private static final ConcurrentHashMap<String, AtomicLong> counterMap =
+ new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<String, AtomicLong> befCountMap =
+ new ConcurrentHashMap<>();
+ private AtomicBoolean isStarted = new AtomicBoolean(true);
@Override
public void run() {
@@ -40,7 +43,6 @@ public class MsgRecvStats implements Runnable {
try {
for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
long currCount = entry.getValue().get();
-
AtomicLong befCount = befCountMap.get(entry.getKey());
if (befCount == null) {
AtomicLong tmpCount = new AtomicLong(0);
@@ -49,10 +51,11 @@ public class MsgRecvStats implements Runnable {
befCount = tmpCount;
}
}
-
+ // output received statistic information
logger.info("********* Current {} Message receive count is {}, dlt is {}",
new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
-
+ // archive historical statistic data
+ befCountMap.get(entry.getKey()).set(currCount);
}
} catch (Throwable t) {
// ignore
@@ -71,10 +74,6 @@ public class MsgRecvStats implements Runnable {
currCount = tmpCount;
}
}
-
- if (currCount.addAndGet(msgCnt) % 500 == 0) {
- logger.info("Receive messages:" + currCount.get());
- }
}
}