You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/21 03:16:18 UTC
[rocketmq] branch develop updated: [ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7c24daf205 [ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)
7c24daf205 is described below
commit 7c24daf205cb25068049fd29a02bff89e2b81b3b
Author: mxsm <lj...@gmail.com>
AuthorDate: Sun May 21 11:15:55 2023 +0800
[ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)
---
.../rocketmq/broker/processor/PullMessageProcessor.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9286cf913d..8df2265c2c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -555,6 +555,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
}
+ /**
+ * Composes the header of the response message to be sent back to the client
+ * @param requestHeader - the header of the request message
+ * @param getMessageResult - the result of the GetMessage request
+ * @param topicSysFlag - the system flag of the topic
+ * @param subscriptionGroupConfig - configuration of the subscription group
+ * @param response - the response message to be sent back to the client
+ * @param clientAddress - the address of the client
+ */
protected void composeResponseHeader(PullMessageRequestHeader requestHeader, GetMessageResult getMessageResult,
int topicSysFlag, SubscriptionGroupConfig subscriptionGroupConfig, RemotingCommand response,
String clientAddress) {
@@ -572,6 +581,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
+ case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
@@ -590,10 +600,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
- case NO_MATCHED_MESSAGE:
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- break;
case OFFSET_FOUND_NULL:
+ case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
@@ -602,9 +610,6 @@ public class PullMessageProcessor implements NettyRequestProcessor {
LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), getMessageResult.getMaxOffset(), clientAddress);
break;
- case OFFSET_OVERFLOW_ONE:
- response.setCode(ResponseCode.PULL_NOT_FOUND);
- break;
case OFFSET_RESET:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOGGER.info("The queue under pulling was previously reset to start from {}",