You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:59 UTC
[rocketmq] 18/26: [ISSUE #5406] Overwrite sysFlag to broker
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9897e74a6c38200f6200c7bc69c7a118ec4ad7fd
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Fri Nov 18 15:46:44 2022 +0800
[ISSUE #5406] Overwrite sysFlag to broker
---
.../src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java | 4 ++++
.../apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java | 4 +++-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index 20b8ad208..15d56dde7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -77,6 +77,10 @@ public class PullSysFlag {
return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
}
+ public static int buildSysFlagWithSubscription(final int sysFlag) {
+ return sysFlag | FLAG_SUBSCRIPTION;
+ }
+
public static boolean hasClassFilterFlag(final int sysFlag) {
return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index 873b52460..eb744676a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -39,7 +39,8 @@ public class PullMessageActivity extends AbstractRemotingActivity {
protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
- if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
+ int sysFlag = requestHeader.getSysFlag();
+ if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (consumerInfo == null) {
return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
@@ -50,6 +51,7 @@ public class PullMessageActivity extends AbstractRemotingActivity {
return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST,
"the consumer's subscription not exist");
}
+ requestHeader.setSysFlag(PullSysFlag.buildSysFlagWithSubscription(sysFlag));
requestHeader.setSubscription(subscriptionData.getSubString());
requestHeader.setExpressionType(subscriptionData.getExpressionType());
request.writeCustomHeader(requestHeader);