You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/05 07:06:19 UTC
[rocketmq] branch develop updated: [ISSUE #6691] Support reentrant pop orderly for broker (#6692)
This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 57642bc63 [ISSUE #6691] Support reentrant pop orderly for broker (#6692)
57642bc63 is described below
commit 57642bc630d5ee42cca026ae389ae3016a61bb9c
Author: lk <xd...@outlook.com>
AuthorDate: Fri May 5 15:06:11 2023 +0800
[ISSUE #6691] Support reentrant pop orderly for broker (#6692)
---
.../broker/offset/ConsumerOrderInfoManager.java | 37 ++++++++++----
.../broker/processor/AckMessageProcessor.java | 2 +-
.../broker/processor/NotificationProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 14 +++---
...ConsumerOrderInfoManagerLockFreeNotifyTest.java | 5 ++
.../offset/ConsumerOrderInfoManagerTest.java | 57 ++++++++++++++++++----
.../protocol/header/PopMessageRequestHeader.java | 11 +++++
.../rocketmq/test/client/rmq/RMQPopClient.java | 7 +++
.../test/client/consumer/pop/BasePopOrderly.java | 19 +++++++-
.../test/client/consumer/pop/PopOrderlyIT.java | 38 +++++++++++++++
.../rocketmq/test/offset/OffsetResetForPopIT.java | 8 +--
11 files changed, 167 insertions(+), 33 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 29bbe9970..2e2850dbb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -91,7 +91,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
* @param msgQueueOffsetList the queue offsets of messages
* @param orderInfoBuilder will append order info to this builder
*/
- public void update(boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
+ public void update(String attemptId, boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -106,12 +106,12 @@ public class ConsumerOrderInfoManager extends ConfigManager {
OrderInfo orderInfo = qs.get(queueId);
if (orderInfo != null) {
- OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
- newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList, orderInfo.offsetConsumedCount);
+ OrderInfo newOrderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
+ newOrderInfo.mergeOffsetConsumedCount(orderInfo.attemptId, orderInfo.offsetList, orderInfo.offsetConsumedCount);
orderInfo = newOrderInfo;
} else {
- orderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
+ orderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
}
qs.put(queueId, orderInfo);
@@ -140,7 +140,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}
- public boolean checkBlock(String topic, String group, int queueId, long invisibleTime) {
+ public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
@@ -156,7 +156,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
if (orderInfo == null) {
return false;
}
- return orderInfo.needBlock(invisibleTime);
+ return orderInfo.needBlock(attemptId, invisibleTime);
}
public void clearBlock(String topic, String group, int queueId) {
@@ -391,17 +391,20 @@ public class ConsumerOrderInfoManager extends ConfigManager {
*/
@JSONField(name = "cm")
private long commitOffsetBit;
+ @JSONField(name = "a")
+ private String attemptId;
public OrderInfo() {
}
- public OrderInfo(long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
+ public OrderInfo(String attemptId, long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
long commitOffsetBit) {
this.popTime = popTime;
this.invisibleTime = invisibleTime;
this.offsetList = buildOffsetList(queueOffsetList);
this.lastConsumeTimestamp = lastConsumeTimestamp;
this.commitOffsetBit = commitOffsetBit;
+ this.attemptId = attemptId;
}
public List<Long> getOffsetList() {
@@ -460,6 +463,14 @@ public class ConsumerOrderInfoManager extends ConfigManager {
this.offsetConsumedCount = offsetConsumedCount;
}
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
+
public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
List<Long> simple = new ArrayList<>();
if (queueOffsetList.size() == 1) {
@@ -475,10 +486,13 @@ public class ConsumerOrderInfoManager extends ConfigManager {
}
@JSONField(serialize = false, deserialize = false)
- public boolean needBlock(long currentInvisibleTime) {
+ public boolean needBlock(String attemptId, long currentInvisibleTime) {
if (offsetList == null || offsetList.isEmpty()) {
return false;
}
+ if (this.attemptId != null && this.attemptId.equals(attemptId)) {
+ return false;
+ }
int num = offsetList.size();
int i = 0;
if (this.invisibleTime == null || this.invisibleTime <= 0) {
@@ -586,11 +600,15 @@ public class ConsumerOrderInfoManager extends ConfigManager {
* @param prevOffsetConsumedCount the offset list of message
*/
@JSONField(serialize = false, deserialize = false)
- public void mergeOffsetConsumedCount(List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
+ public void mergeOffsetConsumedCount(String preAttemptId, List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
Map<Long, Integer> offsetConsumedCount = new HashMap<>();
if (prevOffsetConsumedCount == null) {
prevOffsetConsumedCount = new HashMap<>();
}
+ if (preAttemptId != null && preAttemptId.equals(this.attemptId)) {
+ this.offsetConsumedCount = prevOffsetConsumedCount;
+ return;
+ }
Set<Long> preQueueOffsetSet = new HashSet<>();
for (int i = 0; i < preOffsetList.size(); i++) {
preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
@@ -619,6 +637,7 @@ public class ConsumerOrderInfoManager extends ConfigManager {
.add("offsetConsumedCount", offsetConsumedCount)
.add("lastConsumeTimestamp", lastConsumeTimestamp)
.add("commitOffsetBit", commitOffsetBit)
+ .add("attemptId", attemptId)
.toString();
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 824ba48fc..fa1c0793e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -178,7 +178,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
}
- if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
+ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 3b306ca2d..4be77468f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -264,7 +264,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
- if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+ if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
return false;
}
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5fa4c586a..a89bbb156 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -416,7 +416,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
@@ -425,12 +425,12 @@ public class PopMessageProcessor implements NettyRequestProcessor {
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
} else {
int queueId = requestHeader.getQueueId();
- getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
// if not full , fetch retry again
@@ -440,7 +440,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
- getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
@@ -523,7 +523,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
return null;
}
- private CompletableFuture<Long> popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
+ private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
@@ -545,7 +545,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);
- if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
+ if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
return future;
@@ -618,7 +618,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes);
if (isOrder) {
- this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
+ this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic,
requestHeader.getConsumerGroup(),
queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
orderCountInfo);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
index e5033a05d..93689efa5 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
@@ -67,6 +67,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeMessageThenNoAck() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -83,6 +84,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeMessageThenAck() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -106,6 +108,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeTheChangeInvisibleLonger() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -130,6 +133,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testConsumeTheChangeInvisibleShorter() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -155,6 +159,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
public void testRecover() {
ConsumerOrderInfoManager savedConsumerOrderInfoManager = new ConsumerOrderInfoManager();
savedConsumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index f260632c6..25b418c93 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.offset;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,7 @@ public class ConsumerOrderInfoManagerTest {
@Test
public void testCommitAndNext() {
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -82,6 +84,7 @@ public class ConsumerOrderInfoManagerTest {
));
assertEncodeAndDecode();
assertTrue(consumerOrderInfoManager.checkBlock(
+ null,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -97,6 +100,7 @@ public class ConsumerOrderInfoManagerTest {
));
assertEncodeAndDecode();
assertFalse(consumerOrderInfoManager.checkBlock(
+ null,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -110,6 +114,7 @@ public class ConsumerOrderInfoManagerTest {
// consume three new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -129,6 +134,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume same messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -151,6 +157,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume last two message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -173,6 +180,7 @@ public class ConsumerOrderInfoManagerTest {
// consume a new message and reconsume last message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -193,6 +201,7 @@ public class ConsumerOrderInfoManagerTest {
// consume two new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -215,6 +224,7 @@ public class ConsumerOrderInfoManagerTest {
// consume two new messages
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -225,6 +235,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -244,6 +255,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume two message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -254,6 +266,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -275,6 +288,7 @@ public class ConsumerOrderInfoManagerTest {
// reconsume with a new message
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -285,6 +299,7 @@ public class ConsumerOrderInfoManagerTest {
orderInfoBuilder
);
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -311,6 +326,7 @@ public class ConsumerOrderInfoManagerTest {
StringBuilder orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -329,10 +345,11 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 3L, popTime));
assertEncodeAndDecode();
- await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+ await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
orderInfoBuilder = new StringBuilder();
consumerOrderInfoManager.update(
+ null,
false,
TOPIC,
GROUP,
@@ -350,11 +367,11 @@ public class ConsumerOrderInfoManagerTest {
assertEncodeAndDecode();
assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 4L, popTime));
assertEncodeAndDecode();
- assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 2L, popTime));
assertEncodeAndDecode();
- assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+ assertFalse(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
}
@Test
@@ -377,7 +394,7 @@ public class ConsumerOrderInfoManagerTest {
ConsumerOrderInfoManager consumerOrderInfoManager = new ConsumerOrderInfoManager(brokerController);
{
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
"errTopic",
"errGroup",
QUEUE_ID_0,
@@ -390,7 +407,7 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(0, consumerOrderInfoManager.getTable().size());
}
{
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
"errGroup",
QUEUE_ID_0,
@@ -404,7 +421,7 @@ public class ConsumerOrderInfoManagerTest {
}
{
topicConfig.setReadQueueNums(0);
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -420,7 +437,7 @@ public class ConsumerOrderInfoManagerTest {
}
{
topicConfig.setReadQueueNums(8);
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -461,7 +478,7 @@ public class ConsumerOrderInfoManagerTest {
@Test
public void testLoadFromOldVersionOrderInfoData() {
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -479,10 +496,10 @@ public class ConsumerOrderInfoManagerTest {
String dataEncoded = consumerOrderInfoManager.encode();
consumerOrderInfoManager.decode(dataEncoded);
- assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, 3000));
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000));
StringBuilder orderInfoBuilder = new StringBuilder();
- consumerOrderInfoManager.update(false,
+ consumerOrderInfoManager.update(null, false,
TOPIC,
GROUP,
QUEUE_ID_0,
@@ -497,4 +514,24 @@ public class ConsumerOrderInfoManagerTest {
assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 3)).intValue());
assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 4)).intValue());
}
+
+ @Test
+ public void testReentrant() {
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ String attemptId = UUID.randomUUID().toString();
+ consumerOrderInfoManager.update(
+ attemptId,
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(1L, 2L, 3L),
+ orderInfoBuilder
+ );
+
+ assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000));
+ assertFalse(consumerOrderInfoManager.checkBlock(attemptId, TOPIC, GROUP, QUEUE_ID_0, 3000));
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
index 2460a4f2e..34b97987d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java
@@ -50,6 +50,8 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
*/
private Boolean order = Boolean.FALSE;
+ private String attemptId;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -154,6 +156,14 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
return this.order != null && this.order.booleanValue();
}
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -168,6 +178,7 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader {
.add("expType", expType)
.add("exp", exp)
.add("order", order)
+ .add("attemptId", attemptId)
.toString();
}
}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index b0c8c3250..85dfa7b49 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -69,6 +69,12 @@ public class RMQPopClient implements MQConsumer {
public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime,
int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order,
String expressionType, String expression) {
+ return popMessageAsync(brokerAddr, mq, invisibleTime, maxNums, consumerGroup, timeout, poll, initMode, order, expressionType, expression, null);
+ }
+
+ public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime,
+ int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order,
+ String expressionType, String expression, String attemptId) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(mq.getTopic());
@@ -79,6 +85,7 @@ public class RMQPopClient implements MQConsumer {
requestHeader.setExpType(expressionType);
requestHeader.setExp(expression);
requestHeader.setOrder(order);
+ requestHeader.setAttemptId(attemptId);
if (poll) {
requestHeader.setPollTime(timeout);
requestHeader.setBornTime(System.currentTimeMillis());
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
index ecd70c134..acf70f7f9 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
@@ -95,6 +95,19 @@ public class BasePopOrderly extends BasePop {
}
}
+ protected void assertMsgRecv(int seqId, int expectNum, List<Integer> expectReconsumeTimes) {
+ String msgId = msgRecvSequence.get(seqId);
+ List<MsgRcv> msgRcvList = msgRecv.get(msgId);
+ assertEquals(expectNum, msgRcvList.size());
+ assertConsumeTimes(msgRcvList, expectReconsumeTimes);
+ }
+
+ protected void assertConsumeTimes(List<MsgRcv> msgRcvList, List<Integer> expectReconsumeTimes) {
+ for (int i = 0; i < msgRcvList.size(); i++) {
+ assertEquals(expectReconsumeTimes.get(i).intValue(), msgRcvList.get(i).messageExt.getReconsumeTimes());
+ }
+ }
+
protected void onRecvNewMessage(MessageExt messageExt) {
msgDataRecv.add(new String(messageExt.getBody()));
msgRecvSequence.add(messageExt.getMsgId());
@@ -108,9 +121,13 @@ public class BasePopOrderly extends BasePop {
}
protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout) {
+ return popMessageOrderlyAsync(invisibleTime, maxNums, timeout, null);
+ }
+
+ protected CompletableFuture<PopResult> popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout, String attemptId) {
return client.popMessageAsync(
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
- ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", attemptId);
}
protected CompletableFuture<AckResult> ackMessageAsync(MessageExt messageExt) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
index 04c7f4a34..efb12a321 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.message.MessageExt;
+import org.assertj.core.util.Lists;
import org.junit.Test;
import static org.awaitility.Awaitility.await;
@@ -248,4 +249,41 @@ public class PopOrderlyIT extends BasePopOrderly {
});
return resultFuture;
}
+
+ @Test
+ public void testReentrant() {
+ producer.send(1);
+
+ popMessageForReentrant(null).join();
+ assertMsgRecv(0, 1, Lists.newArrayList(0));
+
+ String attemptId01 = "attemptId-01";
+ popMessageForReentrant(attemptId01).join();
+ assertMsgRecv(0, 2, Lists.newArrayList(0, 1));
+ popMessageForReentrant(attemptId01).join();
+ assertMsgRecv(0, 3, Lists.newArrayList(0, 1, 1));
+
+ String attemptId02 = "attemptId-02";
+ await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> {
+ popMessageForReentrant(attemptId02).join();
+ return msgRecvSequence.size() == 4;
+ });
+ popMessageForReentrant(attemptId02).join();
+ assertMsgRecv(0, 5, Lists.newArrayList(0, 1, 1, 2, 2));
+
+ await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> {
+ popMessageForReentrant(null).join();
+ return msgRecvSequence.size() == 6;
+ });
+ assertMsgRecv(0, 6, Lists.newArrayList(0, 1, 1, 2, 2, 3));
+ }
+
+ private CompletableFuture<Void> popMessageForReentrant(String attemptId) {
+ return popMessageOrderlyAsync(TimeUnit.SECONDS.toMillis(10), 3, TimeUnit.SECONDS.toMillis(30), attemptId)
+ .thenAccept(popResult -> {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ onRecvNewMessage(messageExt);
+ }
+ });
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
index cedc0fe2a..b9798cfd5 100644
--- a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -155,12 +155,12 @@ public class OffsetResetForPopIT extends BaseConf {
// ack old msg, expect has no effect
ackMessageSync(popResult1.getMsgFoundList());
Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
// ack new msg
ackMessageSync(popResult2.getMsgFoundList());
Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
}
@Test
@@ -176,12 +176,12 @@ public class OffsetResetForPopIT extends BaseConf {
PopResult popResult = consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
Assert.assertEquals(messageCount - resetOffset, popResult.getMsgFoundList().size());
Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
ackMessageSync(popResult.getMsgFoundList());
TimeUnit.SECONDS.sleep(1);
Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
- .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
}
@Test