You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 10:35:58 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1750] Optimize the realization of class MessageConsumerExample (#1751)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new 29866d7  [INLONG-1750] Optimize the realization of class MessageConsumerExample (#1751)
29866d7 is described below

commit 29866d75dc8aa0cf45608e42b9f676b08a51bd43
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Nov 4 18:35:48 2021 +0800

    [INLONG-1750] Optimize the realization of class MessageConsumerExample (#1751)
---
 .../tubemq/client/config/TubeClientConfig.java     |   2 +-
 .../inlong/tubemq/corebase/cluster/MasterInfo.java |   8 +-
 .../tubemq/example/MessagePullConsumerExample.java |   9 ++
 ...xample.java => MessagePushConsumerExample.java} | 128 +++++++++++----------
 4 files changed, 79 insertions(+), 68 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index 83285b2..c567c37 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.inlong.tubemq.corerpc.RpcConstants;
  */
 public class TubeClientConfig {
     // Master information.
-    private MasterInfo masterInfo;
+    private final MasterInfo masterInfo;
     // Rpc read time out.
     private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
     // Rpc connection processor number.
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
index 2de42ba..680bb05 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/MasterInfo.java
@@ -29,9 +29,9 @@ public class MasterInfo {
 
     private final Map<String/* ip:port */, NodeAddrInfo> addrMap4Failover =
             new HashMap<>();
-    private List<String> nodeHostPortList;
+    private final List<String> nodeHostPortList;
     private NodeAddrInfo firstNodeAddr = null;
-    private String masterClusterStr;
+    private final String masterClusterStr;
 
     /**
      * masterAddrInfo: "ip1:port,ip2:port"
@@ -70,9 +70,7 @@ public class MasterInfo {
             }
             int port = Integer.parseInt(hostPortItem[1].trim());
             NodeAddrInfo tmpNodeAddrInfo = new NodeAddrInfo(hostName, port);
-            if (addrMap4Failover.get(tmpNodeAddrInfo.getHostPortStr()) == null) {
-                addrMap4Failover.put(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
-            }
+            addrMap4Failover.putIfAbsent(tmpNodeAddrInfo.getHostPortStr(), tmpNodeAddrInfo);
             if (this.firstNodeAddr == null) {
                 this.firstNodeAddr = tmpNodeAddrInfo;
             }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 958b749..0b2d71e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -165,6 +165,15 @@ public final class MessagePullConsumerExample {
         Thread statisticThread =
                 new Thread(msgRecvStats, "Sent Statistic Thread");
         statisticThread.start();
+
+        // 7. Resource cleanup when exiting the service
+        //
+        // 7.1 shutdown consumers
+        // pullConsumer.shutdown();
+        // 7.2 shutdown session factory
+        // sessionFactory.shutdown();
+        // 7.3 shutdown statistic thread
+        // msgRecvStats.stopStats();
     }
 
 }
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
similarity index 50%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index f251a67..0bdc5c7 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -17,28 +17,23 @@
 
 package org.apache.inlong.tubemq.example;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.apache.inlong.tubemq.client.common.PeerInfo;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.MessageListener;
 import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * This demo shows how to consume message sequentially.
+ * This demo shows how to consume messages sequentially in Push mode.
  *
  *Consumer supports subscribe multiple topics in one consume group. Message from subscription
  * sent back to business logic via callback {@link MessageListener}. It is highly recommended NOT
@@ -52,78 +47,87 @@ import org.slf4j.LoggerFactory;
  *     <li>call {@link PushMessageConsumer#resumeConsume()} to resume consumption</li>
  * </ul>
  */
-public final class MessageConsumerExample {
+public final class MessagePushConsumerExample {
 
-    private static final Logger logger =
-            LoggerFactory.getLogger(MessageConsumerExample.class);
     private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    private static MessageSessionFactory sessionFactory;
+    private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
 
-    private final PushMessageConsumer messageConsumer;
-    private final MessageSessionFactory messageSessionFactory;
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter clientCount is the amount of consumer
+    // The 5th parameter fetchThreadCnt is the count of fetch thread
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
+        final String masterServers = args[0];
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        int clientCount = Integer.parseInt(args[3]);
+        if (clientCount <= 0) {
+            clientCount = 1;
+        }
+        int paraFetchThreadCnt = -1;
+        if (args.length > 5) {
+            paraFetchThreadCnt = Integer.parseInt(args[4]);
+        }
+        final int fetchThreadCnt = paraFetchThreadCnt;
+        final Map<String, TreeSet<String>> topicAndFiltersMap =
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
 
-    public MessageConsumerExample(String masterHostAndPort,
-                                  String group, int fetchThreadCnt) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
+        // 2. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterServers, groupName);
+        // 2.1. set consume from latest position if the consumer group is first consume
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2.2. set the fetch thread count of push consumer
         if (fetchThreadCnt > 0) {
             consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
         }
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
-    }
+        // 2.3. build session factory object
+        sessionFactory = new TubeSingleSessionFactory(consumerConfig);
 
-    public static void main(String[] args) {
-        final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int clientCount = Integer.parseInt(args[3]);
-        int threadCnt = -1;
-        if (args.length > 5) {
-            threadCnt = Integer.parseInt(args[4]);
-        }
-        final int fetchThreadCnt = threadCnt;
-        final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        final ExecutorService executorService =
-                Executors.newCachedThreadPool();
+        // 3. build and start consumer object
         for (int i = 0; i < clientCount; i++) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        MessageConsumerExample messageConsumer =
-                                new MessageConsumerExample(masterServers,
-                                        group, fetchThreadCnt);
-                        messageConsumer.subscribe(topicAndFiltersMap);
-                    } catch (Exception e) {
-                        logger.error("Create consumer failed!", e);
-                    }
-                }
-            });
+            // 2. build and start consumer object
+            PushMessageConsumer consumer =
+                    sessionFactory.createPushConsumer(consumerConfig);
+            // 2.1 set subscribed topic and Listener
+            for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+                MessageListener messageListener = new DefaultMessageListener(entry.getKey());
+                consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
+            }
+            // 2.2 start consumer
+            consumer.completeSubscribe();
+            // 2.3 store consumer object
+            consumerMap.put(consumer.getConsumerId(), consumer);
         }
-        final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
-        statisticThread.start();
 
-        executorService.shutdown();
-        try {
-            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Thread Pool shutdown has been interrupted!");
-        }
-        msgRecvStats.stopStats();
-    }
+        // 4. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgRecvStats, "Sent Statistic Thread");
+        statisticThread.start();
 
-    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
-        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            MessageListener messageListener = new DefaultMessageListener(entry.getKey());
-            messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
-        }
-        messageConsumer.completeSubscribe();
+        // 5. Resource cleanup when exiting the service
+        //
+        // 5.1 shutdown consumers
+        // for (PushMessageConsumer consumer : consumerMap.values()) {
+        //     consumer.shutdown();
+        // }
+        // 5.2 shutdown session factory
+        // sessionFactory.shutdown();
+        // 5.3 shutdown statistic thread
+        // msgRecvStats.stopStats();
     }
 
+    // Message callback processing class.
+    // After the SDK receives the message, it will pass the message back to the business layer
+    // for message processing by calling the receiveMessages() API of this class
     public static class DefaultMessageListener implements MessageListener {
 
-        private String topic;
+        private final String topic;
 
         public DefaultMessageListener(String topic) {
             this.topic = topic;