You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 10:58:18 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the resetZero logic for logic queue
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 5b77793 Polish the resetZero logic for logic queue
5b77793 is described below
commit 5b777932d941b1833a510125377a426cf03604ce
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 18:57:52 2021 +0800
Polish the resetZero logic for logic queue
---
.../rocketmq/broker/processor/ConsumerManageProcessor.java | 12 +++++++++---
.../protocol/header/QueryConsumerOffsetRequestHeader.java | 10 ++++++++++
.../common/statictopic/TopicQueueMappingContext.java | 5 -----
3 files changed, 19 insertions(+), 8 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index aad9454..0443396 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -141,8 +141,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
}
- if (mappingContext.checkIfAsPhysical()) {
- //let it go
+ List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList();
+ if (mappingItemList.size() == 1
+ && mappingItemList.get(0).getLogicOffset() == 0) {
+ //as physical, just let it go
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
return null;
}
@@ -169,6 +171,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setPhysical(true);
+ requestHeader.setSetZeroIfNotFound(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
@@ -230,7 +233,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
- if (minOffset <= 0
+ if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
+ response.setCode(ResponseCode.QUERY_NOT_FOUND);
+ response.setRemark("Not found, do not set to zero, maybe this group boot first");
+ }else if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index b7714da..ebcbe0d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private Integer queueId;
+ private Boolean setZeroIfNotFound;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -63,4 +65,12 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
+
+ public Boolean getSetZeroIfNotFound() {
+ return setZeroIfNotFound;
+ }
+
+ public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) {
+ this.setZeroIfNotFound = setZeroIfNotFound;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index f705c43..d6a6fd9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -38,11 +38,6 @@ public class TopicQueueMappingContext {
}
- public boolean checkIfAsPhysical() {
- return mappingDetail == null
- || mappingItemList == null
- || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0);
- }
public boolean isLeader() {
return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname());