You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/21 03:16:18 UTC

[rocketmq] branch develop updated: [ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7c24daf205 [ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)
7c24daf205 is described below

commit 7c24daf205cb25068049fd29a02bff89e2b81b3b
Author: mxsm <lj...@gmail.com>
AuthorDate: Sun May 21 11:15:55 2023 +0800

    [ISSUE #6771] Merge some cases in PullMessageProcessor#composeResponseHeader method (#6772)
---
 .../rocketmq/broker/processor/PullMessageProcessor.java | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9286cf913d..8df2265c2c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -555,6 +555,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
     }
 
+    /**
+     * Composes the header of the response message to be sent back to the client
+     * @param requestHeader - the header of the request message
+     * @param getMessageResult - the result of the GetMessage request
+     * @param topicSysFlag - the system flag of the topic
+     * @param subscriptionGroupConfig - configuration of the subscription group
+     * @param response - the response message to be sent back to the client
+     * @param clientAddress - the address of the client
+     */
     protected void composeResponseHeader(PullMessageRequestHeader requestHeader, GetMessageResult getMessageResult,
         int topicSysFlag, SubscriptionGroupConfig subscriptionGroupConfig, RemotingCommand response,
         String clientAddress) {
@@ -572,6 +581,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 response.setCode(ResponseCode.SUCCESS);
                 break;
             case MESSAGE_WAS_REMOVING:
+            case NO_MATCHED_MESSAGE:
                 response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                 break;
             case NO_MATCHED_LOGIC_QUEUE:
@@ -590,10 +600,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                     response.setCode(ResponseCode.PULL_NOT_FOUND);
                 }
                 break;
-            case NO_MATCHED_MESSAGE:
-                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                break;
             case OFFSET_FOUND_NULL:
+            case OFFSET_OVERFLOW_ONE:
                 response.setCode(ResponseCode.PULL_NOT_FOUND);
                 break;
             case OFFSET_OVERFLOW_BADLY:
@@ -602,9 +610,6 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}",
                     requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), getMessageResult.getMaxOffset(), clientAddress);
                 break;
-            case OFFSET_OVERFLOW_ONE:
-                response.setCode(ResponseCode.PULL_NOT_FOUND);
-                break;
             case OFFSET_RESET:
                 response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                 LOGGER.info("The queue under pulling was previously reset to start from {}",