You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/05 07:06:19 UTC

[rocketmq] branch develop updated: [ISSUE #6691] Support reentrant pop orderly for broker (#6692)

This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 57642bc63 [ISSUE #6691] Support reentrant pop orderly for broker (#6692)
57642bc63 is described below

commit 57642bc630d5ee42cca026ae389ae3016a61bb9c
Author: lk <xd...@outlook.com>
AuthorDate: Fri May 5 15:06:11 2023 +0800

    [ISSUE #6691] Support reentrant pop orderly for broker (#6692)
---
 .../broker/offset/ConsumerOrderInfoManager.java    | 37 ++++++++++----
 .../broker/processor/AckMessageProcessor.java      |  2 +-
 .../broker/processor/NotificationProcessor.java    |  2 +-
 .../broker/processor/PopMessageProcessor.java      | 14 +++---
 ...ConsumerOrderInfoManagerLockFreeNotifyTest.java |  5 ++
 .../offset/ConsumerOrderInfoManagerTest.java       | 57 ++++++++++++++++++----
 .../protocol/header/PopMessageRequestHeader.java   | 11 +++++
 .../rocketmq/test/client/rmq/RMQPopClient.java     |  7 +++
 .../test/client/consumer/pop/BasePopOrderly.java   | 19 +++++++-
 .../test/client/consumer/pop/PopOrderlyIT.java     | 38 +++++++++++++++
 .../rocketmq/test/offset/OffsetResetForPopIT.java  |  8 +--
 11 files changed, 167 insertions(+), 33 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 29bbe9970..2e2850dbb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -91,7 +91,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
      * @param msgQueueOffsetList the queue offsets of messages
      * @param orderInfoBuilder will append order info to this builder
      */
-    public void update(boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
+    public void update(String attemptId, boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
         List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -106,12 +106,12 @@ public class ConsumerOrderInfoManager extends ConfigManager {
         OrderInfo orderInfo = qs.get(queueId);
 
         if (orderInfo != null) {
-            OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
-            newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList, orderInfo.offsetConsumedCount);
+            OrderInfo newOrderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
+            newOrderInfo.mergeOffsetConsumedCount(orderInfo.attemptId, orderInfo.offsetList, orderInfo.offsetConsumedCount);
 
             orderInfo = newOrderInfo;
         } else {
-            orderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
+            orderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
         }
         qs.put(queueId, orderInfo);
 
@@ -140,7 +140,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
         updateLockFreeTimestamp(topic, group, queueId, orderInfo);
     }
 
-    public boolean checkBlock(String topic, String group, int queueId, long invisibleTime) {
+    public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
         if (qs == null) {
@@ -156,7 +156,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
         if (orderInfo == null) {
             return false;
         }
-        return orderInfo.needBlock(invisibleTime);
+        return orderInfo.needBlock(attemptId, invisibleTime);
     }
 
     public void clearBlock(String topic, String group, int queueId) {
@@ -391,17 +391,20 @@ public class ConsumerOrderInfoManager extends ConfigManager {
          */
         @JSONField(name = "cm")
         private long commitOffsetBit;
+        @JSONField(name = "a")
+        private String attemptId;
 
         public OrderInfo() {
         }
 
-        public OrderInfo(long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
+        public OrderInfo(String attemptId, long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
             long commitOffsetBit) {
             this.popTime = popTime;
             this.invisibleTime = invisibleTime;
             this.offsetList = buildOffsetList(queueOffsetList);
             this.lastConsumeTimestamp = lastConsumeTimestamp;
             this.commitOffsetBit = commitOffsetBit;
+            this.attemptId = attemptId;
         }
 
         public List<Long> getOffsetList() {
@@ -460,6 +463,14 @@ public class ConsumerOrderInfoManager extends ConfigManager {
             this.offsetConsumedCount = offsetConsumedCount;
         }
 
+        public String getAttemptId() {
+            return attemptId;
+        }
+
+        public void setAttemptId(String attemptId) {
+            this.attemptId = attemptId;
+        }
+
         public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
             List<Long> simple = new ArrayList<>();
             if (queueOffsetList.size() == 1) {
@@ -475,10 +486,13 @@ public class ConsumerOrderInfoManager extends ConfigManager {
         }
 
         @JSONField(serialize = false, deserialize = false)
-        public boolean needBlock(long currentInvisibleTime) {
+        public boolean needBlock(String attemptId, long currentInvisibleTime) {
             if (offsetList == null || offsetList.isEmpty()) {
                 return false;
             }
+            if (this.attemptId != null && this.attemptId.equals(attemptId)) {
+                return false;
+            }
             int num = offsetList.size();
             int i = 0;
             if (this.invisibleTime == null || this.invisibleTime <= 0) {
@@ -586,11 +600,15 @@ public class ConsumerOrderInfoManager extends ConfigManager {
          * @param prevOffsetConsumedCount the offset list of message
          */
         @JSONField(serialize = false, deserialize = false)
-        public void mergeOffsetConsumedCount(List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
+        public void mergeOffsetConsumedCount(String preAttemptId, List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
             Map<Long, Integer> offsetConsumedCount = new HashMap<>();
             if (prevOffsetConsumedCount == null) {
                 prevOffsetConsumedCount = new HashMap<>();
             }
+            if (preAttemptId != null && preAttemptId.equals(this.attemptId)) {
+                this.offsetConsumedCount = prevOffsetConsumedCount;
+                return;
+            }
             Set<Long> preQueueOffsetSet = new HashSet<>();
             for (int i = 0; i < preOffsetList.size(); i++) {
                 preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
@@ -619,6 +637,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
                 .add("offsetConsumedCount", offsetConsumedCount)
                 .add("lastConsumeTimestamp", lastConsumeTimestamp)
                 .add("commitOffsetBit", commitOffsetBit)
+                .add("attemptId", attemptId)
                 .toString();
         }
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 824ba48fc..fa1c0793e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -178,7 +178,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
                         this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                             requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
                     }
-                    if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
+                    if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(),
                         requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) {
                         this.brokerController.getPopMessageProcessor().notifyMessageArriving(
                             requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 3b306ca2d..4be77468f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -264,7 +264,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
     }
 
     private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
-        if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+        if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
             return false;
         }
         String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5fa4c586a..a89bbb156 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -416,7 +416,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             if (retryTopicConfig != null) {
                 for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                     int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
-                    getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+                    getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
                         startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
                 }
             }
@@ -425,12 +425,12 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             // read all queue
             for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                 int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
-                getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+                getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
                     startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
             }
         } else {
             int queueId = requestHeader.getQueueId();
-            getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+            getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
                 startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
         }
         // if not full , fetch retry again
@@ -440,7 +440,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             if (retryTopicConfig != null) {
                 for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                     int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
-                    getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+                    getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
                         startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
                 }
             }
@@ -523,7 +523,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
         return null;
     }
 
-    private CompletableFuture<Long> popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
+    private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean isRetry, GetMessageResult getMessageResult,
         PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
         Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
         StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
@@ -545,7 +545,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
                 true, lockKey, true);
-            if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
+            if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
                 requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
                 future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
                 return future;
@@ -618,7 +618,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                     BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes);
 
                     if (isOrder) {
-                        this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
+                        this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic,
                             requestHeader.getConsumerGroup(),
                             queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
                             orderCountInfo);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
index e5033a05d..93689efa5 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
@@ -67,6 +67,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     @Test
     public void testConsumeMessageThenNoAck() {
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -83,6 +84,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     @Test
     public void testConsumeMessageThenAck() {
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -106,6 +108,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     @Test
     public void testConsumeTheChangeInvisibleLonger() {
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -130,6 +133,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     @Test
     public void testConsumeTheChangeInvisibleShorter() {
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -155,6 +159,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     public void testRecover() {
         ConsumerOrderInfoManager savedConsumerOrderInfoManager = new ConsumerOrderInfoManager();
         savedConsumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index f260632c6..25b418c93 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.offset;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -63,6 +64,7 @@ public class ConsumerOrderInfoManagerTest {
     @Test
     public void testCommitAndNext() {
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -82,6 +84,7 @@ public class ConsumerOrderInfoManagerTest {
         ));
         assertEncodeAndDecode();
         assertTrue(consumerOrderInfoManager.checkBlock(
+            null,
             TOPIC,
             GROUP,
             QUEUE_ID_0,
@@ -97,6 +100,7 @@ public class ConsumerOrderInfoManagerTest {
         ));
         assertEncodeAndDecode();
         assertFalse(consumerOrderInfoManager.checkBlock(
+            null,
             TOPIC,
             GROUP,
             QUEUE_ID_0,
@@ -110,6 +114,7 @@ public class ConsumerOrderInfoManagerTest {
             // consume three new messages
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -129,6 +134,7 @@ public class ConsumerOrderInfoManagerTest {
             // reconsume same messages
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -151,6 +157,7 @@ public class ConsumerOrderInfoManagerTest {
             // reconsume last two message
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -173,6 +180,7 @@ public class ConsumerOrderInfoManagerTest {
             // consume a new message and reconsume last message
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -193,6 +201,7 @@ public class ConsumerOrderInfoManagerTest {
             // consume two new messages
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -215,6 +224,7 @@ public class ConsumerOrderInfoManagerTest {
             // consume two new messages
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -225,6 +235,7 @@ public class ConsumerOrderInfoManagerTest {
                 orderInfoBuilder
             );
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -244,6 +255,7 @@ public class ConsumerOrderInfoManagerTest {
             // reconsume two message
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -254,6 +266,7 @@ public class ConsumerOrderInfoManagerTest {
                 orderInfoBuilder
             );
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -275,6 +288,7 @@ public class ConsumerOrderInfoManagerTest {
             // reconsume with a new message
             StringBuilder orderInfoBuilder = new StringBuilder();
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -285,6 +299,7 @@ public class ConsumerOrderInfoManagerTest {
                 orderInfoBuilder
             );
             consumerOrderInfoManager.update(
+                null,
                 false,
                 TOPIC,
                 GROUP,
@@ -311,6 +326,7 @@ public class ConsumerOrderInfoManagerTest {
 
         StringBuilder orderInfoBuilder = new StringBuilder();
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -329,10 +345,11 @@ public class ConsumerOrderInfoManagerTest {
         assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 3L, popTime));
         assertEncodeAndDecode();
 
-        await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+        await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
 
         orderInfoBuilder = new StringBuilder();
         consumerOrderInfoManager.update(
+            null,
             false,
             TOPIC,
             GROUP,
@@ -350,11 +367,11 @@ public class ConsumerOrderInfoManagerTest {
         assertEncodeAndDecode();
         assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 4L, popTime));
         assertEncodeAndDecode();
-        assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+        assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
 
         assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 2L, popTime));
         assertEncodeAndDecode();
-        assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+        assertFalse(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
     }
 
     @Test
@@ -377,7 +394,7 @@ public class ConsumerOrderInfoManagerTest {
         ConsumerOrderInfoManager consumerOrderInfoManager = new ConsumerOrderInfoManager(brokerController);
 
         {
-            consumerOrderInfoManager.update(false,
+            consumerOrderInfoManager.update(null, false,
                 "errTopic",
                 "errGroup",
                 QUEUE_ID_0,
@@ -390,7 +407,7 @@ public class ConsumerOrderInfoManagerTest {
             assertEquals(0, consumerOrderInfoManager.getTable().size());
         }
         {
-            consumerOrderInfoManager.update(false,
+            consumerOrderInfoManager.update(null, false,
                 TOPIC,
                 "errGroup",
                 QUEUE_ID_0,
@@ -404,7 +421,7 @@ public class ConsumerOrderInfoManagerTest {
         }
         {
             topicConfig.setReadQueueNums(0);
-            consumerOrderInfoManager.update(false,
+            consumerOrderInfoManager.update(null, false,
                 TOPIC,
                 GROUP,
                 QUEUE_ID_0,
@@ -420,7 +437,7 @@ public class ConsumerOrderInfoManagerTest {
         }
         {
             topicConfig.setReadQueueNums(8);
-            consumerOrderInfoManager.update(false,
+            consumerOrderInfoManager.update(null, false,
                 TOPIC,
                 GROUP,
                 QUEUE_ID_0,
@@ -461,7 +478,7 @@ public class ConsumerOrderInfoManagerTest {
 
     @Test
     public void testLoadFromOldVersionOrderInfoData() {
-        consumerOrderInfoManager.update(false,
+        consumerOrderInfoManager.update(null, false,
             TOPIC,
             GROUP,
             QUEUE_ID_0,
@@ -479,10 +496,10 @@ public class ConsumerOrderInfoManagerTest {
         String dataEncoded = consumerOrderInfoManager.encode();
 
         consumerOrderInfoManager.decode(dataEncoded);
-        assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, 3000));
+        assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000));
 
         StringBuilder orderInfoBuilder = new StringBuilder();
-        consumerOrderInfoManager.update(false,
+        consumerOrderInfoManager.update(null, false,
             TOPIC,
             GROUP,
             QUEUE_ID_0,
@@ -497,4 +514,24 @@ public class ConsumerOrderInfoManagerTest {
         assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 3)).intValue());
         assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 4)).intValue());
     }
+
+    @Test
+    public void testReentrant() {
+        StringBuilder orderInfoBuilder = new StringBuilder();
+        String attemptId = UUID.randomUUID().toString();
+        consumerOrderInfoManager.update(
+            attemptId,
+            false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            3000,
+            Lists.newArrayList(1L, 2L, 3L),
+            orderInfoBuilder
+        );
+
+        assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000));
+        assertFalse(consumerOrderInfoManager.checkBlock(attemptId, TOPIC, GROUP, QUEUE_ID_0, 3000));
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
index 2460a4f2e..34b97987d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
@@ -50,6 +50,8 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
      */
     private Boolean order = Boolean.FALSE;
 
+    private String attemptId;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -154,6 +156,14 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
         return this.order != null && this.order.booleanValue();
     }
 
+    public String getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(String attemptId) {
+        this.attemptId = attemptId;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -168,6 +178,7 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
             .add("expType", expType)
             .add("exp", exp)
             .add("order", order)
+            .add("attemptId", attemptId)
             .toString();
     }
 }
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index b0c8c3250..85dfa7b49 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -69,6 +69,12 @@ public class RMQPopClient implements MQConsumer {
     public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime,
         int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order,
         String expressionType, String expression) {
+        return popMessageAsync(brokerAddr, mq, invisibleTime, maxNums, consumerGroup, timeout, poll, initMode, order, expressionType, expression, null);
+    }
+
+    public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime,
+        int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order,
+        String expressionType, String expression, String attemptId) {
         PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(mq.getTopic());
@@ -79,6 +85,7 @@ public class RMQPopClient implements MQConsumer {
         requestHeader.setExpType(expressionType);
         requestHeader.setExp(expression);
         requestHeader.setOrder(order);
+        requestHeader.setAttemptId(attemptId);
         if (poll) {
             requestHeader.setPollTime(timeout);
             requestHeader.setBornTime(System.currentTimeMillis());
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
index ecd70c134..acf70f7f9 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
@@ -95,6 +95,19 @@ public class BasePopOrderly extends BasePop {
         }
     }
 
+    protected void assertMsgRecv(int seqId, int expectNum, List<Integer> expectReconsumeTimes) {
+        String msgId = msgRecvSequence.get(seqId);
+        List<MsgRcv> msgRcvList = msgRecv.get(msgId);
+        assertEquals(expectNum, msgRcvList.size());
+        assertConsumeTimes(msgRcvList, expectReconsumeTimes);
+    }
+
+    protected void assertConsumeTimes(List<MsgRcv> msgRcvList, List<Integer> expectReconsumeTimes) {
+        for (int i = 0; i < msgRcvList.size(); i++) {
+            assertEquals(expectReconsumeTimes.get(i).intValue(), msgRcvList.get(i).messageExt.getReconsumeTimes());
+        }
+    }
+
     protected void onRecvNewMessage(MessageExt messageExt) {
         msgDataRecv.add(new String(messageExt.getBody()));
         msgRecvSequence.add(messageExt.getMsgId());
@@ -108,9 +121,13 @@ public class BasePopOrderly extends BasePop {
     }
 
     protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout) {
+        return popMessageOrderlyAsync(invisibleTime, maxNums, timeout, null);
+    }
+
+    protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout, String attemptId) {
         return client.popMessageAsync(
             brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
-            ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+            ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", attemptId);
     }
 
     protected CompletableFuture<AckResult> ackMessageAsync(MessageExt messageExt) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
index 04c7f4a34..efb12a321 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.assertj.core.util.Lists;
 import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
@@ -248,4 +249,41 @@ public class PopOrderlyIT extends BasePopOrderly {
         });
         return resultFuture;
     }
+
+    @Test
+    public void testReentrant() {
+        producer.send(1);
+
+        popMessageForReentrant(null).join();
+        assertMsgRecv(0, 1, Lists.newArrayList(0));
+
+        String attemptId01 = "attemptId-01";
+        popMessageForReentrant(attemptId01).join();
+        assertMsgRecv(0, 2, Lists.newArrayList(0, 1));
+        popMessageForReentrant(attemptId01).join();
+        assertMsgRecv(0, 3, Lists.newArrayList(0, 1, 1));
+
+        String attemptId02 = "attemptId-02";
+        await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> {
+            popMessageForReentrant(attemptId02).join();
+            return msgRecvSequence.size() == 4;
+        });
+        popMessageForReentrant(attemptId02).join();
+        assertMsgRecv(0, 5, Lists.newArrayList(0, 1, 1, 2, 2));
+
+        await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> {
+            popMessageForReentrant(null).join();
+            return msgRecvSequence.size() == 6;
+        });
+        assertMsgRecv(0, 6, Lists.newArrayList(0, 1, 1, 2, 2, 3));
+    }
+
+    private CompletableFuture<Void> popMessageForReentrant(String attemptId) {
+        return popMessageOrderlyAsync(TimeUnit.SECONDS.toMillis(10), 3, TimeUnit.SECONDS.toMillis(30), attemptId)
+            .thenAccept(popResult -> {
+                for (MessageExt messageExt : popResult.getMsgFoundList()) {
+                    onRecvNewMessage(messageExt);
+                }
+            });
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
index cedc0fe2a..b9798cfd5 100644
--- a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -155,12 +155,12 @@ public class OffsetResetForPopIT extends BaseConf {
         // ack old msg, expect has no effect
         ackMessageSync(popResult1.getMsgFoundList());
         Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
-            .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+            .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
 
         // ack new msg
         ackMessageSync(popResult2.getMsgFoundList());
         Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
-            .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+            .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
     }
 
     @Test
@@ -176,12 +176,12 @@ public class OffsetResetForPopIT extends BaseConf {
         PopResult popResult = consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
         Assert.assertEquals(messageCount - resetOffset, popResult.getMsgFoundList().size());
         Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
-            .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+            .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
 
         ackMessageSync(popResult.getMsgFoundList());
         TimeUnit.SECONDS.sleep(1);
         Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
-            .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+            .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
     }
 
     @Test