You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/04 09:10:25 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1748][Improve] Optimize the realization of class MessagePullConsumerExample (#1749)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new ebb2c7c  [INLONG-1748][Improve] Optimize the realization of class MessagePullConsumerExample (#1749)
ebb2c7c is described below

commit ebb2c7c6363f61c3c5cd0b47a49abd0c7db49ae1
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Nov 4 17:08:01 2021 +0800

    [INLONG-1748][Improve] Optimize the realization of class MessagePullConsumerExample (#1749)
---
 .../tubemq/example/MessagePullConsumerExample.java | 202 +++++++++++----------
 .../apache/inlong/tubemq/example/MsgRecvStats.java |  21 +--
 2 files changed, 121 insertions(+), 102 deletions(-)

diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4c243fe..958b749 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -18,15 +18,14 @@
 package org.apache.inlong.tubemq.example;
 
 import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
+
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.inlong.tubemq.corebase.Message;
@@ -35,117 +34,138 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This demo shows how to consume message by pull.
+ * This demo shows how to consume messages through TubeSingleSessionFactory + PullMessageConsumer
  *
  *Consume message in pull mode achieved by {@link PullMessageConsumer#getMessage()}.
  * Note that whenever {@link PullMessageConsumer#getMessage()} returns successfully, the
- * return value(whether or not to be {@code null}) should be processed by
+ * return value(whether or not to be {@code null}) must be processed by
  * {@link PullMessageConsumer#confirmConsume(String, boolean)}.
  */
 public final class MessagePullConsumerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullConsumerExample.class);
-    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
-
-    private final PullMessageConsumer messagePullConsumer;
-    private final MessageSessionFactory messageSessionFactory;
-
-    public MessagePullConsumerExample(String masterHostAndPort, String group) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
-    }
 
+    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    private static PullMessageConsumer pullConsumer;
+    private static MessageSessionFactory sessionFactory;
+
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter consumeCount is the amount of messages that need to be consumed
+    // The 5th parameter fetchThreadCnt is the count of fetch thread
     public static void main(String[] args) throws Throwable {
-        // get and initial parameters
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int msgCount = Integer.parseInt(args[3]);
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        final int consumeCount = Integer.parseInt(args[3]);
+        int fetchThreadCnt = 3;
+        if (args.length > 4) {
+            fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+                    1, Runtime.getRuntime().availableProcessors());
+        }
         final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial consumer object
-        final MessagePullConsumerExample messageConsumer =
-                new MessagePullConsumerExample(masterServers, group);
-        messageConsumer.subscribe(topicAndFiltersMap);
-        Thread[] fetchRunners = new Thread[3];
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+        // 2. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterServers, groupName);
+        // 2.1 set consume from latest position if the consumer group is first consume
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2.2 build session factory object
+        //     Attention: here we are using the TubeSingleSessionFactory object(a
+        //                singleton session factory can only create one object in a process,
+        //                requiring all topics to be in one cluster.
+        //                If the topics subscribed to by the objects in the process are
+        //                in different clusters, then need to use the TubeMultiSessionFactory class,
+        //                please refer to the example of TubeMultiSessionFactory usage)
+        sessionFactory = new TubeSingleSessionFactory(consumerConfig);
+
+        // 3 build consumer object
+        //    we can construct multiple consumers after the creation of the session factory object
+        pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+        // 3.1 Set the Topic and the filter item set corresponding to the consumption
+        //     if you not need filter consumption,
+        //    set the parameter filterConds is null or empty set
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            pullConsumer.subscribe(entry.getKey(), entry.getValue());
+        }
+        // 3.2 start consumption
+        pullConsumer.completeSubscribe();
+
+        // 4. initial fetch threads
+        Thread[] fetchRunners = new Thread[fetchThreadCnt];
         for (int i = 0; i < fetchRunners.length; i++) {
-            fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
-            fetchRunners[i].setName("_fetch_runner_" + i);
+            fetchRunners[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConsumerResult csmResult;
+                    int getCount = consumeCount;
+                    do {
+                        try {
+                            // 4.1 judge consumer is shutdown
+                            if (pullConsumer.isShutdown()) {
+                                logger.warn("Consumer is shutdown!");
+                                break;
+                            }
+                            // 4.2 get messages from server
+                            csmResult = pullConsumer.getMessage();
+                            if (csmResult.isSuccess()) {
+                                // 4.2.1 process message if getMessage() return success
+                                List<Message> messageList = csmResult.getMessageList();
+                                if (messageList != null && !messageList.isEmpty()) {
+                                    msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                                }
+                                // 4.2.1.1 confirm consume result
+                                // Notice:
+                                //    1. If the processing fails, the parameter isConsumed can
+                                //       be set to false, but this is likely to cause
+                                //       an infinite loop of data consumption, so
+                                //       it is strongly recommended to set this parameter
+                                //       to true when using it, and the failed data can
+                                //       be processed in other ways
+                                //    2. The messageList returned by getMessage() may be empty,
+                                //       and confirmConsume() is still required call in this case
+                                pullConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+                            } else {
+                                // 4.2.2 process failure when getMessage() return false
+                                //       Any failure can be ignored
+                                if (!IGNORE_ERROR_SET.contains(csmResult.getErrCode())) {
+                                    logger.debug(
+                                            "Receive messages errorCode is {}, Error message is {}",
+                                            csmResult.getErrCode(), csmResult.getErrMsg());
+                                }
+                            }
+                            // 4.3 Determine whether the consumed data reaches the goal
+                            if (consumeCount > 0) {
+                                if (--getCount <= 0) {
+                                    logger.info("Consumer has consumed {} messages!", consumeCount);
+                                    break;
+                                }
+                            }
+                        } catch (Throwable e) {
+                            // Any exceptions in running can be ignored
+                        }
+                    } while (true);
+                    logger.info("The fetch thread has exited!");
+                }
+            }, "_fetch_runner_" + i);
         }
-        // initial fetch threads
+
+        // 5. start fetch threads
         for (Thread thread : fetchRunners) {
             thread.start();
         }
-        // initial statistic thread
+
+        // 6. initial and statistic thread
         Thread statisticThread =
                 new Thread(msgRecvStats, "Sent Statistic Thread");
         statisticThread.start();
     }
 
-    public void subscribe(
-            Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
-        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
-        }
-        messagePullConsumer.completeSubscribe();
-    }
-
-    public ConsumerResult getMessage() throws TubeClientException {
-        return messagePullConsumer.getMessage();
-    }
-
-    public ConsumerResult confirmConsume(final String confirmContext,
-                                         boolean isConsumed) throws TubeClientException {
-        return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
-    }
-
-    public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
-        return messagePullConsumer.getCurConsumedPartitions();
-    }
-
-    private static class FetchRequestRunner implements Runnable {
-
-        final MessagePullConsumerExample messageConsumer;
-        final int consumeCount;
-
-        FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
-            this.messageConsumer = messageConsumer;
-            this.consumeCount = msgCount;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int getCount = consumeCount;
-                do {
-                    ConsumerResult result = messageConsumer.getMessage();
-                    if (result.isSuccess()) {
-                        List<Message> messageList = result.getMessageList();
-                        if (messageList != null && !messageList.isEmpty()) {
-                            msgRecvStats.addMsgCount(result.getTopicName(), messageList.size());
-                        }
-                        messageConsumer.confirmConsume(result.getConfirmContext(), true);
-                    } else {
-                        if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
-                            logger.info(
-                                    "Receive messages errorCode is {}, Error message is {}",
-                                    result.getErrCode(), result.getErrMsg());
-                        }
-                    }
-                    if (consumeCount > 0) {
-                        if (--getCount <= 0) {
-                            break;
-                        }
-                    }
-                } while (true);
-                msgRecvStats.stopStats();
-            } catch (TubeClientException e) {
-                logger.error("Create consumer failed!", e);
-            }
-        }
-    }
 }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
index 9074026..f418052 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
@@ -29,10 +29,13 @@ import org.slf4j.LoggerFactory;
  * This demo shows how to collect and report message received statistics.
  */
 public class MsgRecvStats implements Runnable {
-    private static final Logger logger = LoggerFactory.getLogger(MsgRecvStats.class);
-    private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
-    private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
-    private AtomicBoolean isStarted = new AtomicBoolean(false);
+    private static final Logger logger =
+            LoggerFactory.getLogger(MsgRecvStats.class);
+    private static final ConcurrentHashMap<String, AtomicLong> counterMap =
+            new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, AtomicLong> befCountMap =
+            new ConcurrentHashMap<>();
+    private AtomicBoolean isStarted = new AtomicBoolean(true);
 
     @Override
     public void run() {
@@ -40,7 +43,6 @@ public class MsgRecvStats implements Runnable {
             try {
                 for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
                     long currCount = entry.getValue().get();
-
                     AtomicLong befCount = befCountMap.get(entry.getKey());
                     if (befCount == null) {
                         AtomicLong tmpCount = new AtomicLong(0);
@@ -49,10 +51,11 @@ public class MsgRecvStats implements Runnable {
                             befCount = tmpCount;
                         }
                     }
-
+                    // output received statistic information
                     logger.info("********* Current {} Message receive count is {}, dlt is {}",
                         new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
-
+                    // archive historical statistic data
+                    befCountMap.get(entry.getKey()).set(currCount);
                 }
             } catch (Throwable t) {
                 // ignore
@@ -71,10 +74,6 @@ public class MsgRecvStats implements Runnable {
                     currCount = tmpCount;
                 }
             }
-
-            if (currCount.addAndGet(msgCnt) % 500 == 0) {
-                logger.info("Receive messages:" + currCount.get());
-            }
         }
     }