You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/05 12:26:59 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1758] Optimize the realization of class MessagePullSetConsumerExample (#1759)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new 8b3e625  [INLONG-1758] Optimize the realization of class MessagePullSetConsumerExample (#1759)
8b3e625 is described below

commit 8b3e625f4aa54c837bf817068745744be497e3ca
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Nov 5 20:26:44 2021 +0800

    [INLONG-1758] Optimize the realization of class MessagePullSetConsumerExample (#1759)
---
 .../tubemq/client/consumer/MessageConsumer.java    |  32 ++
 .../tubemq/example/MessagePullConsumerExample.java |   8 +
 .../example/MessagePullSetConsumerExample.java     | 334 +++++++++++++--------
 .../tubemq/example/MessagePushConsumerExample.java |   8 +-
 4 files changed, 248 insertions(+), 134 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
index 2f3a1ce..df98490 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/MessageConsumer.java
@@ -35,6 +35,11 @@ public interface MessageConsumer extends Shutdownable {
 
     boolean isFilterConsume(String topic);
 
+    /**
+     * Get consume offset information of the current registered partitions
+     *
+     * @return  consume offset information
+     */
     Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException;
 
     /**
@@ -73,8 +78,35 @@ public interface MessageConsumer extends Shutdownable {
      */
     Map<String, Long> getFrozenPartInfo();
 
+    /**
+     * Start consume messages with default setting
+     */
     void completeSubscribe() throws TubeClientException;
 
+    /**
+     * Start consumption with the precise Offset settings
+     *
+     * The parameter sessionKey is specified by the caller, similar to the JobID in Flink,
+     * which is used to identify the unrelated offset reset consumption activities before and after.
+     * Each reset operation needs to ensure that it is different from the last reset carried sessionKey;
+     *
+     * The parameter sourceCount is used to inform the server how many consumers will consume
+     * in this round of consumer group activation, and the client will not consume data until
+     * the consumer group has not reached the specified number of consumers.
+     *
+     * The parameter isSelectBig is used to inform the server that if multiple clients reset
+     * the offset to the same partition, the server will use the largest offset
+     * or the smallest offset as the standard;
+     *
+     * The parameter partOffsetMap is used to inform the server that this consumption expects
+     * the partitions in the Map to be consumed according to the specified offset value.
+     * The offset in the Map comes from the consumer's query from the server, or the content
+     * returned when the consumer successfully consumes the data before, including push
+     * the PearInfo object returned by the callback function during consumption and
+     * the PearInfo object in the ConsumerResult class during Pull consumption.
+     * The Key in the Map is the partitionKey carried in PearInfo, and the value is
+     * the currOffset value carried in PearInfo.
+     */
     void completeSubscribe(String sessionKey,
                            int sourceCount,
                            boolean isSelectBig,
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 0b2d71e..4e9674b 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -105,6 +105,14 @@ public final class MessagePullConsumerExample {
                 public void run() {
                     ConsumerResult csmResult;
                     int getCount = consumeCount;
+                    // wait partition status ready
+                    do {
+                        if (pullConsumer.isPartitionsReady(5000)
+                                || pullConsumer.isShutdown()) {
+                            break;
+                        }
+                    } while (true);
+                    // consume messages
                     do {
                         try {
                             // 4.1 judge consumer is shutdown
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index d50b431..2654e75 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,29 +17,31 @@
 
 package org.apache.inlong.tubemq.example;
 
-import static org.apache.inlong.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
 import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
 import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
+ * This demo shows how to consume messages with bootstrap offset through
+ * TubeMultiSessionFactory + PullMessageConsumer
+ *
+ * First, we start a consumer for data consumption, and then we exit it, start another consumer
+ * with the same group name, and set the partition's bootstrap offset to 0 for data consumption.
+ *
+ * The main difference from {@link MessagePullConsumerExample}
  * is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
  * {@link PullMessageConsumer#completeSubscribe()}. The former supports multiple options to configure
  * when to reset offset.
@@ -48,141 +50,213 @@ public final class MessagePullSetConsumerExample {
 
     private static final Logger logger =
             LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
-    private static final AtomicLong counter = new AtomicLong(0);
-
-    private final PullMessageConsumer messagePullConsumer;
-    private final MessageSessionFactory messageSessionFactory;
-
-    public MessagePullSetConsumerExample(String masterHostAndPort,
-                                         String group) throws Exception {
-        ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-        this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
-    }
+    // Statistic object
+    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+    // The map of the master cluster and Multiple session factory
+    //    There may be multiple consumers in the same process and the topic sets subscribed
+    //    by different consumers are in different clusters. In this case,
+    //    we need to construct session factories by TubeMultiSessionFactory class.
+    private static final ConcurrentHashMap<String, MessageSessionFactory> multSessFtyMap =
+            new ConcurrentHashMap<>();
 
-    public static void main(String[] args) {
-        // get and initial parameters
+    // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+    //     the master address(es) to connect to;
+    // The 2nd parameter subTopicAndFiterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+    //     the topic(s) (and filter condition set) to consume on.
+    // The 3rd parameter groupName is the name of consumer group
+    // The 4th parameter consumeCount is the amount of messages that need to be consumed
+    // The 5th  parameter fetchThreadCnt is the count of fetch thread
+    public static void main(String[] args) throws Throwable {
+        // 1. get and initial parameters
         final String masterServers = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int msgCount = Integer.parseInt(args[3]);
+        final String subTopicAndFiterItems = args[1];
+        final String groupName = args[2];
+        final int consumeCount = Integer.parseInt(args[3]);
+        int fetchThreadCnt = 3;
+        if (args.length > 4) {
+            fetchThreadCnt = MixedUtils.mid(Integer.parseInt(args[4]),
+                    1, Runtime.getRuntime().availableProcessors());
+        }
         final Map<String, TreeSet<String>> topicAndFiltersMap =
-                MixedUtils.parseTopicParam(topics);
-        // initial reset offset parameters
-        // (The offset specified is only a demo)
-        final Map<String, Long> partOffsetMap =
+                MixedUtils.parseTopicParam(subTopicAndFiterItems);
+
+        // 2. initial and statistic thread
+        Thread statisticThread =
+                new Thread(msgRecvStats, "Sent Statistic Thread");
+        statisticThread.start();
+
+        // 3. Start the consumer group for the first consumption
+        // 3.1. build consumer object
+        logger.info("The first consumption begin!");
+        PullMessageConsumer consumer1 =
+                createPullConsumer(masterServers, groupName);
+        // 3.2. Set the Topic and the filter item set corresponding to the consumption
+        //     if you not need filter consumption,
+        //    set the parameter filterConds is null or empty set
+        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+            consumer1.subscribe(entry.getKey(), entry.getValue());
+        }
+        // 3.3. start consumption with
+        String sessionKeyFst = "test_consume_first";
+        int sourceCountFst = 1;
+        boolean isSelectBig = false;
+        // The map of partitionKey and last success offset
+        //    You can persist the information and use it when restarting or
+        //    re-rolling to keep the current consumption to start from
+        //    the offset required in the last round
+        ConcurrentHashMap<String, Long> partOffsetMapFst =
                 new ConcurrentHashMap<>();
-        partOffsetMap.put("123:test_1:0", 0L);
-        partOffsetMap.put("123:test_1:1", 0L);
-        partOffsetMap.put("123:test_1:2", 0L);
-        partOffsetMap.put("123:test_2:0", 350L);
-        partOffsetMap.put("123:test_2:1", 350L);
-        partOffsetMap.put("123:test_2:2", 350L);
-
-        ExecutorService executorService = Executors.newCachedThreadPool();
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    int getCount = msgCount;
-                    MessagePullSetConsumerExample messageConsumer =
-                        new MessagePullSetConsumerExample(masterServers, group);
-                    messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
-                    // main logic of consuming
-                    do {
-                        ConsumerResult result = messageConsumer.getMessage();
-                        if (result.isSuccess()) {
-                            List<Message> messageList = result.getMessageList();
-                            if (messageList != null) {
-                                logger.info("Receive messages:" + counter.addAndGet(messageList.size()));
-                            }
-
-                            // Offset returned by GetMessage represents the initial offset of this request
-                            // if consumer group is pure Pull mode, the initial offset can be saved;
-                            // if not, we have to use the return value of confirmConsume
-                            long oldValue = partOffsetMap.get(
-                                result.getPartitionKey()) == null
-                                ? -1
-                                : partOffsetMap.get(result.getPartitionKey());
-                            partOffsetMap.put(result.getPartitionKey(), result.getCurrOffset());
-                            logger.info(
-                                "GetMessage , partitionKey={}, oldValue={}, newVal={}",
-                                new Object[]{result.getPartitionKey(), oldValue, result.getCurrOffset()});
-
-                            // save the Offset from the return value of confirmConsume
-                            ConsumerResult confirmResult = messageConsumer.confirmConsume(
-                                result.getConfirmContext(),
-                                true);
-                            if (confirmResult.isSuccess()) {
-                                oldValue = partOffsetMap.get(
-                                    result.getPartitionKey()) == null
-                                    ? -1
-                                    : partOffsetMap.get(result.getPartitionKey());
-                                partOffsetMap.put(result.getPartitionKey(), confirmResult.getCurrOffset());
-                                logger.info(
-                                    "ConfirmConsume , partitionKey={}, oldValue={}, newVal={}",
-                                    new Object[]{
-                                        confirmResult.getPartitionKey(),
-                                        oldValue,
-                                        confirmResult.getCurrOffset()});
-                            } else {
-                                logger.info(
-                                    "ConfirmConsume failure, errCode is {}, errInfo is {}.",
-                                    confirmResult.getErrCode(),
-                                    confirmResult.getErrMsg());
-                            }
-                        } else {
-                            if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
-                                logger.info(
-                                        "Receive messages errorCode is {}, Error message is {}",
-                                        result.getErrCode(), result.getErrMsg());
-                            }
-                        }
-                        if (msgCount >= 0) {
-                            if (--getCount <= 0) {
-                                break;
-                            }
-                        }
-                    } while (true);
-                } catch (Exception e) {
-                    logger.error("Create consumer failed!", e);
-                }
-            }
-        });
+        consumer1.completeSubscribe(sessionKeyFst,
+                sourceCountFst, isSelectBig, partOffsetMapFst);
 
-        executorService.shutdown();
-        try {
-            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Thread Pool shutdown has been interrupted!");
+        // 3.4. initial and start fetch threads
+        Thread[] fetchRunners1 = new Thread[fetchThreadCnt];
+        for (int i = 0; i < fetchRunners1.length; i++) {
+            fetchRunners1[i] = new Thread(new FetchRequestRunner(
+                    consumer1, partOffsetMapFst, consumeCount), "_fetch_runner_" + i);
+        }
+        for (Thread thread : fetchRunners1) {
+            thread.start();
+        }
+
+        // 3.5. wait consume data
+        ThreadUtils.sleep(2 * 60 * 1000);
+        logger.info("The first consumption has finished!");
+
+        // 3.6. shutdown consumer
+        consumer1.shutdown();
+        for (Thread thread : fetchRunners1) {
+            thread.join();
         }
-    }
 
-    public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
-                          Map<String, Long> partOffsetMap) throws TubeClientException {
+        logger.info("The second consumption begin!");
+        // 4. Start the consumer group for the second consumption
+        // 4.1 set the boostrap Offset, here we set consumption from 0
+        String sessionKeySec = "test_consume_Second";
+        int sourceCountSec = 1;
+        ConcurrentHashMap<String, Long> partOffsetMapSec =
+                new ConcurrentHashMap<>();
+        for (String partKey : partOffsetMapFst.keySet()) {
+            partOffsetMapSec.put(partKey, 0L);
+        }
+        // 4.2. build consumer object
+        PullMessageConsumer consumer2 =
+                createPullConsumer(masterServers, groupName);
+        // 4.3. Set the Topic and the filter item set corresponding to the consumption
         for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
-            messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
+            consumer2.subscribe(entry.getKey(), entry.getValue());
         }
-        String sessionKey = "test_reset2";
-        int consumerCount = 2;
-        boolean isSelectBig = false;
-        messagePullConsumer.completeSubscribe(sessionKey,
-                consumerCount, isSelectBig, partOffsetMap);
-    }
+        consumer2.completeSubscribe(sessionKeySec,
+                sourceCountSec, isSelectBig, partOffsetMapSec);
 
-    public ConsumerResult getMessage() throws TubeClientException {
-        return messagePullConsumer.getMessage();
+        // 4.4. initial and start fetch threads
+        Thread[] fetchRunners2 = new Thread[fetchThreadCnt];
+        for (int i = 0; i < fetchRunners2.length; i++) {
+            fetchRunners2[i] = new Thread(new FetchRequestRunner(
+                    consumer2, partOffsetMapSec, consumeCount), "_fetch_runner_" + i);
+        }
+        for (Thread thread : fetchRunners2) {
+            thread.start();
+        }
     }
 
-    public ConsumerResult confirmConsume(
-        String confirmContext,
-        boolean isConsumed
-    ) throws TubeClientException {
-        return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
+    //  consumer object creation function
+    private static PullMessageConsumer createPullConsumer(
+            String masterHostAndPorts, String groupName) throws Exception {
+        // 1. initial configure and build session factory object
+        ConsumerConfig consumerConfig =
+                new ConsumerConfig(masterHostAndPorts, groupName);
+        // 1.2 set consume from latest position if the consumer group is first consume
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        // 2. build session factory object
+        //    find and initialize TubeMultiSessionFactory object according to the Master cluster information
+        MasterInfo masterInfo = consumerConfig.getMasterInfo();
+        MessageSessionFactory sessionFactory =
+                multSessFtyMap.get(masterInfo.getMasterClusterStr());
+        if (sessionFactory == null) {
+            MessageSessionFactory tmpSessionFactory =
+                    new TubeMultiSessionFactory(consumerConfig);
+            sessionFactory = multSessFtyMap.putIfAbsent(
+                    masterInfo.getMasterClusterStr(), tmpSessionFactory);
+            if (sessionFactory == null) {
+                sessionFactory = tmpSessionFactory;
+            } else {
+                tmpSessionFactory.shutdown();
+            }
+        }
+        // 3. Create and get the PullMessageConsumer object
+        return sessionFactory.createPullConsumer(consumerConfig);
     }
 
-    public Map<String, ConsumeOffsetInfo> getCurrPartitionOffsetMap() throws TubeClientException {
-        return messagePullConsumer.getCurConsumedPartitions();
+    // fetch message runner
+    private static class FetchRequestRunner implements Runnable {
+
+        final PullMessageConsumer pullConsumer;
+        final ConcurrentHashMap<String, Long> partOffsetMap;
+        final int consumeCount;
+
+        FetchRequestRunner(PullMessageConsumer messageConsumer,
+                           ConcurrentHashMap<String, Long> partOffsetMap,
+                           int msgCount) {
+            this.pullConsumer = messageConsumer;
+            this.partOffsetMap = partOffsetMap;
+            this.consumeCount = msgCount;
+        }
+
+        @Override
+        public void run() {
+            ConsumerResult csmResult;
+            ConsumerResult cfmResult;
+            int getCount = consumeCount;
+            // wait partition status ready
+            do {
+                if (pullConsumer.isPartitionsReady(5000)
+                        || pullConsumer.isShutdown()) {
+                    break;
+                }
+            } while (true);
+            // consume messages
+            do {
+                try {
+                    // 1 judge consumer is shutdown
+                    if (pullConsumer.isShutdown()) {
+                        logger.warn("Consumer is shutdown!");
+                        break;
+                    }
+                    // 2 get messages from server
+                    csmResult = pullConsumer.getMessage();
+                    if (csmResult.isSuccess()) {
+                        // 2.1 process message if getMessage() return success
+                        List<Message> messageList = csmResult.getMessageList();
+                        if (messageList != null && !messageList.isEmpty()) {
+                            msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+                        }
+                        // 2.1.2 store the offset of processing message
+                        //       the offset returned by GetMessage() represents the initial offset of this request
+                        //       if consumer group is pure Pull mode, the initial offset can be saved;
+                        //       if not, we have to use the return value of confirmConsume()
+                        partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+                        // 2.1.3 confirm consume result
+                        cfmResult = pullConsumer.confirmConsume(
+                                csmResult.getConfirmContext(), true);
+                        if (cfmResult.isSuccess()) {
+                            // store confirmed offset value
+                            partOffsetMap.put(csmResult.getPartitionKey(), csmResult.getCurrOffset());
+                        }
+                    }
+                    // 3. Determine whether the consumed data reaches the goal
+                    if (consumeCount > 0) {
+                        if (--getCount <= 0) {
+                            logger.info("Consumer has consumed {} messages!", consumeCount);
+                            break;
+                        }
+                    }
+                } catch (Throwable e) {
+                    // Any exceptions in running can be ignored
+                }
+            } while (true);
+            logger.info("The fetch thread has exited!");
+        }
     }
 }
 
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index 0bdc5c7..0db4acd 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -91,17 +91,17 @@ public final class MessagePushConsumerExample {
 
         // 3. build and start consumer object
         for (int i = 0; i < clientCount; i++) {
-            // 2. build and start consumer object
+            // 3.1. build and start consumer object
             PushMessageConsumer consumer =
                     sessionFactory.createPushConsumer(consumerConfig);
-            // 2.1 set subscribed topic and Listener
+            // 3.2. set subscribed topic and Listener
             for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
                 MessageListener messageListener = new DefaultMessageListener(entry.getKey());
                 consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
             }
-            // 2.2 start consumer
+            // 3.3. start consumer
             consumer.completeSubscribe();
-            // 2.3 store consumer object
+            // 3.4. store consumer object
             consumerMap.put(consumer.getConsumerId(), consumer);
         }