You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:59 UTC

[rocketmq] 18/26: [ISSUE #5406] Overwrite sysFlag to broker

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9897e74a6c38200f6200c7bc69c7a118ec4ad7fd
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Fri Nov 18 15:46:44 2022 +0800

    [ISSUE #5406] Overwrite sysFlag to broker
---
 .../src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java | 4 ++++
 .../apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java  | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index 20b8ad208..15d56dde7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -77,6 +77,10 @@ public class PullSysFlag {
         return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
     }
 
+    public static int buildSysFlagWithSubscription(final int sysFlag) {
+        return sysFlag | FLAG_SUBSCRIPTION;
+    }
+
     public static boolean hasClassFilterFlag(final int sysFlag) {
         return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index 873b52460..eb744676a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -39,7 +39,8 @@ public class PullMessageActivity extends AbstractRemotingActivity {
     protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
         ProxyContext context) throws Exception {
         PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-        if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
+        int sysFlag = requestHeader.getSysFlag();
+        if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
             ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
             if (consumerInfo == null) {
                 return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
@@ -50,6 +51,7 @@ public class PullMessageActivity extends AbstractRemotingActivity {
                 return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST,
                     "the consumer's subscription not exist");
             }
+            requestHeader.setSysFlag(PullSysFlag.buildSysFlagWithSubscription(sysFlag));
             requestHeader.setSubscription(subscriptionData.getSubString());
             requestHeader.setExpressionType(subscriptionData.getExpressionType());
             request.writeCustomHeader(requestHeader);