You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 10:35:58 UTC
[incubator-inlong] branch INLONG-1739 updated: [INLONG-1750]
Optimize the realization of class MessageConsumerExample (#1751)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new 29866d7 [INLONG-1750] Optimize the realization of class MessageConsumerExample (#1751)
29866d7 is described below
commit 29866d75dc8aa0cf45608e42b9f676b08a51bd43
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Nov 4 18:35:48 2021 +0800
[INLONG-1750] Optimize the realization of class MessageConsumerExample (#1751)
---
.../tubemq/client/config/TubeClientConfig.java | 2 +-
.../inlong/tubemq/corebase/cluster/MasterInfo.java | 8 +-
.../tubemq/example/MessagePullConsumerExample.java | 9 ++
...xample.java => MessagePushConsumerExample.java} | 128 +++++++++++----------
4 files changed, 79 insertions(+), 68 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index 83285b2..c567c37 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.inlong.tubemq.corerpc.RpcConstants;
*/
public class TubeClientConfig {
// Master information.
- private MasterInfo masterInfo;
+ private final MasterInfo masterInfo;
// Rpc read time out.
private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
// Rpc connection processor number.
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
index 2de42ba..680bb05 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
@@ -29,9 +29,9 @@ public class MasterInfo {
private final Map<String/* ip:port */, NodeAddrInfo> addrMap4Failover =
new HashMap<>();
- private List<String> nodeHostPortList;
+ private final List<String> nodeHostPortList;
private NodeAddrInfo firstNodeAddr = null;
- private String masterClusterStr;
+ private final String masterClusterStr;
/**
* masterAddrInfo: "ip1:port,ip2:port"
@@ -70,9 +70,7 @@ public class MasterInfo {
}
int port = Integer.parseInt(hostPortItem[1].trim());
NodeAddrInfo tmpNodeAddrInfo = new NodeAddrInfo(hostName, port);
- if (addrMap4Failover.get(tmpNodeAddrInfo.getHostPortStr()) == null) {
- addrMap4Failover.put(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
- }
+ addrMap4Failover.putIfAbsent(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
if (this.firstNodeAddr == null) {
this.firstNodeAddr = tmpNodeAddrInfo;
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 958b749..0b2d71e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -165,6 +165,15 @@ public final class MessagePullConsumerExample {
Thread statisticThread =
new Thread(msgRecvStats, "Sent Statistic Thread");
statisticThread.start();
+
+ // 7. Resource cleanup when exiting the service
+ //
+ // 7.1 shutdown consumers
+ // pullConsumer.shutdown();
+ // 7.2 shutdown session factory
+ // sessionFactory.shutdown();
+ // 7.3 shutdown statistic thread
+ // msgRecvStats.stopStats();
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
similarity index 50%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index f251a67..0bdc5c7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -17,28 +17,23 @@
package org.apache.inlong.tubemq.example;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.inlong.tubemq.client.common.PeerInfo;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.MessageListener;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * This demo shows how to consume message sequentially.
+ * This demo shows how to consume messages sequentially in Push mode.
*
*Consumer supports subscribe multiple topics in one consume group. Message from subscription
* sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
@@ -52,78 +47,87 @@ import org.slf4j.LoggerFactory;
* <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
* </ul>
*/
-public final class MessageConsumerExample {
+public final class MessagePushConsumerExample {
- private static final Logger logger =
- LoggerFactory.getLogger(MessageConsumerExample.class);
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ private static MessageSessionFactory sessionFactory;
+ private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
- private final PushMessageConsumer messageConsumer;
- private final MessageSessionFactory messageSessionFactory;
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to consume on.
+ // The 3rd parameter groupName is the name of consumer group
+ // The 4th parameter clientCount is the amount of consumer
+ // The 5th parameter fetchThreadCnt is the count of fetch thread
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
+ final String masterServers = args[0];
+ final String subTopicAndFiterItems = args[1];
+ final String groupName = args[2];
+ int clientCount = Integer.parseInt(args[3]);
+ if (clientCount <= 0) {
+ clientCount = 1;
+ }
+ int paraFetchThreadCnt = -1;
+ if (args.length > 5) {
+ paraFetchThreadCnt = Integer.parseInt(args[4]);
+ }
+ final int fetchThreadCnt = paraFetchThreadCnt;
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(subTopicAndFiterItems);
- public MessageConsumerExample(String masterHostAndPort,
- String group, int fetchThreadCnt) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
+ // 2. initial configure and build session factory object
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterServers, groupName);
+ // 2.1. set consume from latest position if the consumer group is first consume
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ // 2.2. set the fetch thread count of push consumer
if (fetchThreadCnt > 0) {
consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
}
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
- this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
- }
+ // 2.3. build session factory object
+ sessionFactory = new TubeSingleSessionFactory(consumerConfig);
- public static void main(String[] args) {
- final String masterServers = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int clientCount = Integer.parseInt(args[3]);
- int threadCnt = -1;
- if (args.length > 5) {
- threadCnt = Integer.parseInt(args[4]);
- }
- final int fetchThreadCnt = threadCnt;
- final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- final ExecutorService executorService =
- Executors.newCachedThreadPool();
+ // 3. build and start consumer object
for (int i = 0; i < clientCount; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumerExample messageConsumer =
- new MessageConsumerExample(masterServers,
- group, fetchThreadCnt);
- messageConsumer.subscribe(topicAndFiltersMap);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
+ // 2. build and start consumer object
+ PushMessageConsumer consumer =
+ sessionFactory.createPushConsumer(consumerConfig);
+ // 2.1 set subscribed topic and Listener
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ MessageListener messageListener = new DefaultMessageListener(entry.getKey());
+ consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
+ }
+ // 2.2 start consumer
+ consumer.completeSubscribe();
+ // 2.3 store consumer object
+ consumerMap.put(consumer.getConsumerId(), consumer);
}
- final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
- statisticThread.start();
- executorService.shutdown();
- try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
- }
- msgRecvStats.stopStats();
- }
+ // 4. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgRecvStats, "Sent Statistic Thread");
+ statisticThread.start();
- public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
- for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
- MessageListener messageListener = new DefaultMessageListener(entry.getKey());
- messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
- }
- messageConsumer.completeSubscribe();
+ // 5. Resource cleanup when exiting the service
+ //
+ // 5.1 shutdown consumers
+ // for (PushMessageConsumer consumer : consumerMap.values()) {
+ // consumer.shutdown();
+ // }
+ // 5.2 shutdown session factory
+ // sessionFactory.shutdown();
+ // 5.3 shutdown statistic thread
+ // msgRecvStats.stopStats();
}
+ // Message callback processing class.
+ // After the SDK receives the message, it will pass the message back to the business layer
+ // for message processing by calling the receiveMessages() API of this class
public static class DefaultMessageListener implements MessageListener {
- private String topic;
+ private final String topic;
public DefaultMessageListener(String topic) {
this.topic = topic;