You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/12/05 07:23:47 UTC

[rocketmq] 01/01: Fix bug that make static topic ITs unable to pass

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dev-27
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 883418d8a30fd702529591e4687c415454a75932
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon Dec 5 15:23:15 2022 +0800

    Fix bug that make static topic ITs unable to pass
---
 .../broker/processor/ConsumerManageProcessor.java       | 17 ++++++++---------
 .../client/consumer/store/RemoteBrokerOffsetStore.java  |  1 -
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 53bb5b9f8..395102c7e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -110,7 +110,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    public  RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) {
+    public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader,
+        final TopicQueueMappingContext mappingContext) {
         try {
             if (mappingContext.getMappingDetail() == null) {
                 return null;
@@ -140,7 +141,6 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         }
     }
 
-
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
 
@@ -201,8 +201,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         return response;
     }
 
-
-    public  RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+    public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader,
+        TopicQueueMappingContext mappingContext) {
         try {
             if (mappingContext.getMappingDetail() == null) {
                 return null;
@@ -213,7 +213,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
             }
             List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList();
             if (mappingItemList.size() == 1
-                    &&  mappingItemList.get(0).getLogicOffset() == 0) {
+                && mappingItemList.get(0).getLogicOffset() == 0) {
                 //as physical, just let it go
                 mappingContext.setCurrentItem(mappingItemList.get(0));
                 requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
@@ -277,8 +277,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         }
     }
 
-
-    public  RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader,
+    public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader,
+        final QueryConsumerOffsetResponseHeader responseHeader,
         final TopicQueueMappingContext mappingContext, final int code) {
         try {
             if (mappingContext.getMappingDetail() == null) {
@@ -306,9 +306,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
             (QueryConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
 
-
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
-        RemotingCommand rewriteResult  = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 21c7cd3a0..1b9cd63db 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -240,7 +240,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
             requestHeader.setBname(mq.getBrokerName());
-            requestHeader.setSetZeroIfNotFound(false);
 
             return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);