You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/16 03:32:19 UTC

[rocketmq] branch develop updated: [ISSUE #6761] Support reentrant notification orderly for broker (#6762)

This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9dfd4a959 [ISSUE #6761] Support reentrant notification orderly for broker (#6762)
9dfd4a959 is described below

commit 9dfd4a95997bfe0aa0ab6d812f6b4dd01509d8d5
Author: lk <xd...@outlook.com>
AuthorDate: Tue May 16 11:32:11 2023 +0800

    [ISSUE #6761] Support reentrant notification orderly for broker (#6762)
---
 .../broker/processor/NotificationProcessor.java       |  6 ++++--
 .../protocol/header/NotificationRequestHeader.java    | 18 ++++++++++++++++++
 .../apache/rocketmq/test/client/rmq/RMQPopClient.java |  7 +++++++
 .../test/client/consumer/pop/NotificationIT.java      | 19 +++++++++++++++++++
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index d07aadfdb..a15340383 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -174,8 +174,10 @@ public class NotificationProcessor implements NettyRequestProcessor {
     }
 
     private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
-        if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
-            return false;
+        if (Boolean.TRUE.equals(requestHeader.getOrder())) {
+            if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+                return false;
+            }
         }
         String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
         long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 7ac6c500e..5965e9dcb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -33,6 +33,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private long bornTime;
 
+    private Boolean order = Boolean.FALSE;
+    private String attemptId;
+
     @CFNotNull
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -81,4 +84,19 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
         this.queueId = queueId;
     }
 
+    public Boolean getOrder() {
+        return order;
+    }
+
+    public void setOrder(Boolean order) {
+        this.order = order;
+    }
+
+    public String getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(String attemptId) {
+        this.attemptId = attemptId;
+    }
 }
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 74d834681..496bd6da4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -172,12 +172,19 @@ public class RMQPopClient implements MQConsumer {
 
     public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
         String consumerGroup, int queueId, long pollTime, long bornTime, long timeoutMillis) {
+        return notification(brokerAddr, topic, consumerGroup, queueId, null, null, pollTime, bornTime, timeoutMillis);
+    }
+
+    public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
+        String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) {
         NotificationRequestHeader requestHeader = new NotificationRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         requestHeader.setPollTime(pollTime);
         requestHeader.setBornTime(bornTime);
+        requestHeader.setOrder(order);
+        requestHeader.setAttemptId(attemptId);
         return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis);
     }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
index af6f499cd..072159599 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
@@ -66,6 +66,25 @@ public class NotificationIT extends BasePop {
         assertThat(result2).isFalse();
     }
 
+    @Test
+    public void testNotificationOrderly() throws Exception {
+        long pollTime = 500;
+        String attemptId = "attemptId";
+        CompletableFuture<Boolean> future1 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId, pollTime, System.currentTimeMillis(), 5000);
+        CompletableFuture<Boolean> future2 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId, pollTime, System.currentTimeMillis(), 5000);
+        sendMessage(1);
+        Boolean result1 = future1.get();
+        assertThat(result1).isTrue();
+        client.popMessageAsync(brokerAddr, messageQueue, 10000, 1, group, 1000, false,
+            ConsumeInitMode.MIN, true, null, null, attemptId);
+        Boolean result2 = future2.get();
+        assertThat(result2).isTrue();
+
+        String attemptId2 = "attemptId2";
+        CompletableFuture<Boolean> future3 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId2, pollTime, System.currentTimeMillis(), 5000);
+        assertThat(future3.get()).isFalse();
+    }
+
     protected void sendMessage(int num) {
         MessageQueueMsg mqMsgs = new MessageQueueMsg(Lists.newArrayList(messageQueue), num);
         producer.send(mqMsgs.getMsgsWithMQ());