You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/16 03:32:19 UTC
[rocketmq] branch develop updated: [ISSUE #6761] Support reentrant notification orderly for broker (#6762)
This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9dfd4a959 [ISSUE #6761] Support reentrant notification orderly for broker (#6762)
9dfd4a959 is described below
commit 9dfd4a95997bfe0aa0ab6d812f6b4dd01509d8d5
Author: lk <xd...@outlook.com>
AuthorDate: Tue May 16 11:32:11 2023 +0800
[ISSUE #6761] Support reentrant notification orderly for broker (#6762)
---
.../broker/processor/NotificationProcessor.java | 6 ++++--
.../protocol/header/NotificationRequestHeader.java | 18 ++++++++++++++++++
.../apache/rocketmq/test/client/rmq/RMQPopClient.java | 7 +++++++
.../test/client/consumer/pop/NotificationIT.java | 19 +++++++++++++++++++
4 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index d07aadfdb..a15340383 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -174,8 +174,10 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
- if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
- return false;
+ if (Boolean.TRUE.equals(requestHeader.getOrder())) {
+ if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+ return false;
+ }
}
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 7ac6c500e..5965e9dcb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -33,6 +33,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private long bornTime;
+ private Boolean order = Boolean.FALSE;
+ private String attemptId;
+
@CFNotNull
@Override
public void checkFields() throws RemotingCommandException {
@@ -81,4 +84,19 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
this.queueId = queueId;
}
+ public Boolean getOrder() {
+ return order;
+ }
+
+ public void setOrder(Boolean order) {
+ this.order = order;
+ }
+
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 74d834681..496bd6da4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -172,12 +172,19 @@ public class RMQPopClient implements MQConsumer {
public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
String consumerGroup, int queueId, long pollTime, long bornTime, long timeoutMillis) {
+ return notification(brokerAddr, topic, consumerGroup, queueId, null, null, pollTime, bornTime, timeoutMillis);
+ }
+
+ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
+ String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) {
NotificationRequestHeader requestHeader = new NotificationRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setPollTime(pollTime);
requestHeader.setBornTime(bornTime);
+ requestHeader.setOrder(order);
+ requestHeader.setAttemptId(attemptId);
return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis);
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
index af6f499cd..072159599 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/NotificationIT.java
@@ -66,6 +66,25 @@ public class NotificationIT extends BasePop {
assertThat(result2).isFalse();
}
+ @Test
+ public void testNotificationOrderly() throws Exception {
+ long pollTime = 500;
+ String attemptId = "attemptId";
+ CompletableFuture<Boolean> future1 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId, pollTime, System.currentTimeMillis(), 5000);
+ CompletableFuture<Boolean> future2 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId, pollTime, System.currentTimeMillis(), 5000);
+ sendMessage(1);
+ Boolean result1 = future1.get();
+ assertThat(result1).isTrue();
+ client.popMessageAsync(brokerAddr, messageQueue, 10000, 1, group, 1000, false,
+ ConsumeInitMode.MIN, true, null, null, attemptId);
+ Boolean result2 = future2.get();
+ assertThat(result2).isTrue();
+
+ String attemptId2 = "attemptId2";
+ CompletableFuture<Boolean> future3 = client.notification(brokerAddr, topic, group, messageQueue.getQueueId(), true, attemptId2, pollTime, System.currentTimeMillis(), 5000);
+ assertThat(future3.get()).isFalse();
+ }
+
protected void sendMessage(int num) {
MessageQueueMsg mqMsgs = new MessageQueueMsg(Lists.newArrayList(messageQueue), num);
producer.send(mqMsgs.getMsgsWithMQ());