You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/24 09:08:48 UTC
[rocketmq] 02/02: [ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is supported. At the same time, you can specify the offset that the MessageQueue will commit (#4820)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9872667a7671ee1c7a00c5a0ba8a1b24db691a6c
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Aug 25 09:08:23 2022 +0800
[ISSUE #4805]In the subscribe mode, user-defined MessageQueueListener is supported. At the same time, you can specify the offset that the MessageQueue will commit (#4820)
(cherry picked from commit a1d2a7325505d572a80f4fe18928a1997a3abbcc)
---
.../client/consumer/DefaultLitePullConsumer.java | 36 ++++++-
.../rocketmq/client/consumer/LitePullConsumer.java | 23 +++++
.../impl/consumer/DefaultLitePullConsumerImpl.java | 107 ++++++++++++++++++---
.../consumer/DefaultLitePullConsumerTest.java | 5 +-
4 files changed, 155 insertions(+), 16 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6b8d1b4ae..76acd6338 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -265,7 +266,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
-
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
@@ -322,6 +322,40 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.defaultLitePullConsumerImpl.commitAll();
}
+ /**
+ * Offset specified by batch commit
+ * @param offsetMap
+ * @param persist
+ */
+ @Override
+ public void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist) {
+ this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
+ }
+
+ /**
+ * Get the MessageQueue assigned in subscribe mode
+ *
+ * @return
+ * @throws MQClientException
+ */
+ @Override
+ public Set<MessageQueue> assignment() throws MQClientException {
+ return this.defaultLitePullConsumerImpl.assignment();
+ }
+
+ /**
+ * Subscribe some topic with subExpression and messageQueueListener
+ *
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ */
+ @Override
+ public void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression, messageQueueListener);
+ }
+
+
@Override
public void commit(final Set<MessageQueue> messageQueues, boolean persist) {
this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 8bca31c78..e9e67d055 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public interface LitePullConsumer {
@@ -52,6 +53,14 @@ public interface LitePullConsumer {
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
+ /**
+ * Subscribe some topic with subExpression and messageQueueListener
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ */
+ void subscribe(final String topic, final String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException;
+
/**
* Subscribe some topic with selector.
*
@@ -67,6 +76,14 @@ public interface LitePullConsumer {
*/
void unsubscribe(final String topic);
+
+ /**
+ * subscribe mode, get assigned MessageQueue
+ * @return
+ * @throws MQClientException
+ */
+ Set<MessageQueue> assignment() throws MQClientException;
+
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
@@ -170,6 +187,12 @@ public interface LitePullConsumer {
*/
void commitSync();
+ /**
+ * Offset specified by batch commit
+ * @param offsetMap
+ * @param persist
+ */
+ void commitSync(Map<MessageQueue, Long> offsetMap, boolean persist);
void commit(final Set<MessageQueue> messageQueues, boolean persist);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f3cd7d5b8..267ee1c77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -241,19 +241,23 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
- switch (messageModel) {
- case BROADCASTING:
- updateAssignedMessageQueue(topic, mqAll);
- updatePullTask(topic, mqAll);
- break;
- case CLUSTERING:
- updateAssignedMessageQueue(topic, mqDivided);
- updatePullTask(topic, mqDivided);
- break;
- default:
- break;
- }
+ updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+ }
+ }
+
+ public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+ switch (messageModel) {
+ case BROADCASTING:
+ updateAssignedMessageQueue(topic, mqAll);
+ updatePullTask(topic, mqAll);
+ break;
+ case CLUSTERING:
+ updateAssignedMessageQueue(topic, mqDivided);
+ updatePullTask(topic, mqDivided);
+ break;
+ default:
+ break;
}
}
@@ -472,6 +476,41 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
+ /**
+ * subscribe data by customizing messageQueueListener
+ * @param topic
+ * @param subExpression
+ * @param messageQueueListener
+ * @throws MQClientException
+ */
+ public synchronized void subscribe(String topic, String subExpression, final MessageQueueListener messageQueueListener) throws MQClientException {
+ try {
+ if (StringUtils.isEmpty(topic)) {
+ throw new IllegalArgumentException("Topic can not be null or empty.");
+ }
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListener() {
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ // First, update the assign queue
+ updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
+ // run custom listener
+ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+ }
+ });
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscribe exception", e);
+ }
+ }
+
+
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
@@ -673,6 +712,42 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
+ /**
+ * Specify offset commit
+ * @param messageQueues
+ * @param persist
+ */
+ public synchronized void commit(final Map<MessageQueue, Long> messageQueues, boolean persist) {
+ if (messageQueues == null || messageQueues.size() == 0) {
+ log.warn("MessageQueues is empty, Ignore this commit ");
+ return;
+ }
+ for (Map.Entry<MessageQueue, Long> messageQueueEntry : messageQueues.entrySet()) {
+ MessageQueue messageQueue = messageQueueEntry.getKey();
+ long offset = messageQueueEntry.getValue();
+ if (offset != -1) {
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null && !processQueue.isDropped()) {
+ updateConsumeOffset(messageQueue, offset);
+ }
+ } else {
+ log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "].");
+ }
+ }
+
+ if (persist) {
+ this.offsetStore.persistAll(messageQueues.keySet());
+ }
+ }
+
+ /**
+ * Get the queue assigned in subscribe mode
+ * @return
+ */
+ public synchronized Set<MessageQueue> assignment() {
+ return assignedMessageQueue.getAssignedMessageQueues();
+ }
+
public synchronized void commit(final Set<MessageQueue> messageQueues, boolean persist) {
if (messageQueues == null || messageQueues.size() == 0) {
return;
@@ -1150,8 +1225,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return isEqual;
}
+ public AssignedMessageQueue getAssignedMessageQueue() {
+ return assignedMessageQueue;
+ }
+
public synchronized void registerTopicMessageQueueChangeListener(String topic,
- TopicMessageQueueChangeListener listener) throws MQClientException {
+ TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 0f0327f0f..2c79ed007 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer;
+import java.util.ArrayList;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
@@ -704,7 +705,9 @@ public class DefaultLitePullConsumerTest {
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
- PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt));
+ List<MessageExt> list = new ArrayList<MessageExt>();
+ list.add(messageClientExt);
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, list);
return pullResult;
}
});