You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/24 09:08:48 UTC

[rocketmq] 02/02: [ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is supported. At the same time, you can specify the offset that the MessageQueue will commit (#4820)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9872667a7671ee1c7a00c5a0ba8a1b24db691a6c
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Aug 25 09:08:23 2022 +0800

    [ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is supported. At the same time, you can specify the offset that the MessageQueue will commit (#4820)
    
    (cherry picked from commit a1d2a7325505d572a80f4fe18928a1997a3abbcc)
---
 .../client/consumer/DefaultLitePullConsumer.java   |  36 ++++++-
 .../rocketmq/client/consumer/LitePullConsumer.java |  23 +++++
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 107 ++++++++++++++++++---
 .../consumer/DefaultLitePullConsumerTest.java      |   5 +-
 4 files changed, 155 insertions(+), 16 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6b8d1b4ae..76acd6338 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -265,7 +266,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public void unsubscribe(String topic) {
         this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
     }
-
     @Override
     public void assign(Collection<MessageQueue> messageQueues) {
         defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -322,6 +322,40 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
         this.defaultLitePullConsumerImpl.commitAll();
     }
 
+    /**
+     * Offset specified by batch commit
+     * @param offsetMap
+     * @param persist
+     */
+    @Override
+    public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist) {
+        this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
+    }
+
+    /**
+     * Get the MessageQueue assigned in subscribe mode
+     *
+     * @return
+     * @throws MQClientException
+     */
+    @Override
+    public Set<MessageQueue> assignment() throws MQClientException {
+        return this.defaultLitePullConsumerImpl.assignment();
+    }
+
+    /**
+     * Subscribe some topic with subExpression and messageQueueListener
+     *
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     */
+    @Override
+    public void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
+        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression, messageQueueListener);
+    }
+
+
     @Override
     public void commit(final Set<MessageQueue> messageQueues, boolean persist) {
         this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 8bca31c78..e9e67d055 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public interface LitePullConsumer {
@@ -52,6 +53,14 @@ public interface LitePullConsumer {
      */
     void subscribe(final String topic, final String subExpression) throws MQClientException;
 
+    /**
+     * Subscribe some topic with subExpression and messageQueueListener
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     */
+    void subscribe(final String topic, final String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException;
+
     /**
      * Subscribe some topic with selector.
      *
@@ -67,6 +76,14 @@ public interface LitePullConsumer {
      */
     void unsubscribe(final String topic);
 
+
+    /**
+     * subscribe mode, get assigned MessageQueue
+     * @return
+     * @throws MQClientException
+     */
+    Set<MessageQueue> assignment() throws MQClientException;
+
     /**
      * Manually assign a list of message queues to this consumer. This interface does not allow for incremental
      * assignment and will replace the previous assignment (if there is one).
@@ -170,6 +187,12 @@ public interface LitePullConsumer {
      */
     void commitSync();
 
+    /**
+     * Offset specified by batch commit
+     * @param offsetMap
+     * @param persist
+     */
+    void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist);
 
     void commit(final Set<MessageQueue> messageQueues, boolean persist);
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f3cd7d5b8..267ee1c77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -241,19 +241,23 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     class MessageQueueListenerImpl implements MessageQueueListener {
         @Override
         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
-            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
-            switch (messageModel) {
-                case BROADCASTING:
-                    updateAssignedMessageQueue(topic, mqAll);
-                    updatePullTask(topic, mqAll);
-                    break;
-                case CLUSTERING:
-                    updateAssignedMessageQueue(topic, mqDivided);
-                    updatePullTask(topic, mqDivided);
-                    break;
-                default:
-                    break;
-            }
+            updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+        }
+    }
+
+    public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+        switch (messageModel) {
+            case BROADCASTING:
+                updateAssignedMessageQueue(topic, mqAll);
+                updatePullTask(topic, mqAll);
+                break;
+            case CLUSTERING:
+                updateAssignedMessageQueue(topic, mqDivided);
+                updatePullTask(topic, mqDivided);
+                break;
+            default:
+                break;
         }
     }
 
@@ -472,6 +476,41 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
+    /**
+     * subscribe data by customizing messageQueueListener
+     * @param topic
+     * @param subExpression
+     * @param messageQueueListener
+     * @throws MQClientException
+     */
+    public synchronized void subscribe(String topic, String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException {
+        try {
+            if (StringUtils.isEmpty(topic)) {
+                throw new IllegalArgumentException("Topic can not be null or empty.");
+            }
+            setSubscriptionType(SubscriptionType.SUBSCRIBE);
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListener() {
+                @Override
+                public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+                    // First, update the assign queue
+                    updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+                    // run custom listener
+                    messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+                }
+            });
+            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+            if (serviceState == ServiceState.RUNNING) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscribe exception", e);
+        }
+    }
+
+
     public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
         try {
             if (topic == null || "".equals(topic)) {
@@ -673,6 +712,42 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
+    /**
+     * Specify offset commit
+     * @param messageQueues
+     * @param persist
+     */
+    public synchronized void commit(final Map<MessageQueue, Long> messageQueues, boolean persist) {
+        if (messageQueues == null || messageQueues.size() == 0) {
+            log.warn("MessageQueues is empty, Ignore this commit ");
+            return;
+        }
+        for (Map.Entry<MessageQueue, Long> messageQueueEntry : messageQueues.entrySet()) {
+            MessageQueue messageQueue = messageQueueEntry.getKey();
+            long offset = messageQueueEntry.getValue();
+            if (offset != -1) {
+                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+                if (processQueue != null && !processQueue.isDropped()) {
+                    updateConsumeOffset(messageQueue, offset);
+                }
+            } else {
+                log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "].");
+            }
+        }
+
+        if (persist) {
+            this.offsetStore.persistAll(messageQueues.keySet());
+        }
+    }
+
+    /**
+     * Get the queue assigned in subscribe mode
+     * @return
+     */
+    public synchronized Set<MessageQueue> assignment() {
+        return assignedMessageQueue.getAssignedMessageQueues();
+    }
+
     public synchronized void commit(final Set<MessageQueue> messageQueues, boolean persist) {
         if (messageQueues == null || messageQueues.size() == 0) {
             return;
@@ -1150,8 +1225,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         return isEqual;
     }
 
+    public AssignedMessageQueue getAssignedMessageQueue() {
+        return assignedMessageQueue;
+    }
+
     public synchronized void registerTopicMessageQueueChangeListener(String topic,
-        TopicMessageQueueChangeListener listener) throws MQClientException {
+                                                                     TopicMessageQueueChangeListener listener) throws MQClientException {
         if (topic == null || listener == null) {
             throw new MQClientException("Topic or listener is null", null);
         }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 0f0327f0f..2c79ed007 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.consumer;
 
+import java.util.ArrayList;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
@@ -704,7 +705,9 @@ public class DefaultLitePullConsumerTest {
                         messageClientExt.setOffsetMsgId("234");
                         messageClientExt.setBornHost(new InetSocketAddress(8080));
                         messageClientExt.setStoreHost(new InetSocketAddress(8080));
-                        PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt));
+                        List<MessageExt> list = new ArrayList<MessageExt>();
+                        list.add(messageClientExt);
+                        PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, list);
                         return pullResult;
                     }
                 });