You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/12/05 07:23:47 UTC
[rocketmq] 01/01: Fix bug that make static topic ITs unable to pass
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dev-27
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 883418d8a30fd702529591e4687c415454a75932
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon Dec 5 15:23:15 2022 +0800
Fix bug that make static topic ITs unable to pass
---
.../broker/processor/ConsumerManageProcessor.java | 17 ++++++++---------
.../client/consumer/store/RemoteBrokerOffsetStore.java | 1 -
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 53bb5b9f8..395102c7e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -110,7 +110,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) {
+ public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader,
+ final TopicQueueMappingContext mappingContext) {
try {
if (mappingContext.getMappingDetail() == null) {
return null;
@@ -140,7 +141,6 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
}
-
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
@@ -201,8 +201,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
return response;
}
-
- public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader,
+ TopicQueueMappingContext mappingContext) {
try {
if (mappingContext.getMappingDetail() == null) {
return null;
@@ -213,7 +213,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList();
if (mappingItemList.size() == 1
- && mappingItemList.get(0).getLogicOffset() == 0) {
+ && mappingItemList.get(0).getLogicOffset() == 0) {
//as physical, just let it go
mappingContext.setCurrentItem(mappingItemList.get(0));
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
@@ -277,8 +277,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
}
-
- public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader,
+ public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader,
+ final QueryConsumerOffsetResponseHeader responseHeader,
final TopicQueueMappingContext mappingContext, final int code) {
try {
if (mappingContext.getMappingDetail() == null) {
@@ -306,9 +306,8 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
-
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
- RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 21c7cd3a0..1b9cd63db 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -240,7 +240,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());
- requestHeader.setSetZeroIfNotFound(false);
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);