You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/03 07:34:56 UTC
[rocketmq] 01/03: Fix the max offset logic for logicOffset = -1
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4f96f72c1c24b557ec5998a6985a353f64ec5f6c
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Dec 3 15:12:11 2021 +0800
Fix the max offset logic for logicOffset = -1
---
.../broker/processor/AdminBrokerProcessor.java | 53 ++++++++++++++++------
.../broker/processor/PullMessageProcessor.java | 6 +--
.../broker/processor/SendMessageProcessor.java | 2 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 25 ++++++++++
.../common/statictopic/LogicQueueMappingItem.java | 19 ++++----
.../common/statictopic/TopicQueueMappingUtils.java | 3 --
.../apache/rocketmq/tools/admin/MQAdminUtils.java | 3 +-
7 files changed, 78 insertions(+), 33 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 33d3025..a286818 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
long offset = -1;
for (int i = 0; i < mappingItems.size(); i++) {
LogicQueueMappingItem item = mappingItems.get(i);
+ if (!item.checkIfLogicoffsetDecided()) {
+ continue;
+ }
if (mappingDetail.getBname().equals(item.getBname())) {
- //means the leader
- assert i == mappingItems.size() - 1;
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
if (offset > 0) {
- offset = item.computeStaticQueueOffsetUpToEnd(offset);
+ offset = item.computeStaticQueueOffsetStrictly(offset);
+ break;
}
} else {
requestHeader.setPhysical(true);
@@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
- SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader();
+ SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader) rpcResponse.getHeader();
if (offsetResponseHeader.getOffset() < 0
|| (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue;
} else {
- offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
+ offset = item.computeStaticQueueOffsetStrictly(offsetResponseHeader.getOffset());
}
}
@@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
- long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
- offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
+ try {
+ LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true);
+ assert maxItem != null;
+ assert maxItem.getLogicOffset() >= 0;
+ requestHeader.setBname(maxItem.getBname());
+ requestHeader.setPhysical(true);
+ requestHeader.setQueueId(mappingItem.getQueueId());
+ long maxPhysicalOffset = Long.MAX_VALUE;
+ if (maxItem.getBname().equals(mappingDetail.getBname())) {
+ //current broker
+ maxPhysicalOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
+ } else {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MAX_OFFSET, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ GetMaxOffsetResponseHeader offsetResponseHeader = (GetMaxOffsetResponseHeader) rpcResponse.getHeader();
+ maxPhysicalOffset = offsetResponseHeader.getOffset();
+ }
- final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
- final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
- responseHeader.setOffset(offset);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+ final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(maxItem.computeStaticQueueOffsetStrictly(maxPhysicalOffset));
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
}
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
@@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
physicalOffset = offsetResponseHeader.getOffset();
}
- long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset);
+ long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
@@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 6bac707..9b8135e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& nextBeginOffset >= currentItem.getEndOffset()) {
nextBeginOffset = currentItem.getEndOffset();
}
- responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
+ responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset));
//handle min offset
- responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset)));
+ responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset)));
//handle max offset
- responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset),
+ responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
//set the offsetDelta
responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index c229ce3..bdcba39 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
//no need to care the broker name
- long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
+ long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 713bbf9..47ffcc2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
@@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.GET_MIN_OFFSET:
rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs);
break;
+ case RequestCode.GET_MAX_OFFSET:
+ rpcResponsePromise = handleGetMaxOffset(addr, request, timeoutMs);
+ break;
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs);
break;
@@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
+ public Promise<RpcResponse> handleGetMaxOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetMaxOffsetResponseHeader responseHeader =
+ (GetMaxOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+ rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+ break;
+ }
+ default:{
+ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+ }
+ }
+ return rpcResponsePromise;
+ }
+
public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 9f79e9d..76e7406 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
this.timeOfEnd = timeOfEnd;
}
- public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
+
+ //should only be user in sendMessage and getMinOffset
+ public long computeStaticQueueOffsetLoosely(long physicalQueueOffset) {
//consider the newly mapped item
if (logicOffset < 0) {
return -1;
@@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
return logicOffset + (physicalQueueOffset - startOffset);
}
- public long computeStaticQueueOffset(long physicalQueueOffset) {
- if (logicOffset < 0) {
- return logicOffset;
- }
+ public long computeStaticQueueOffsetStrictly(long physicalQueueOffset) {
+ assert logicOffset >= 0;
+
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
@@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable {
return logicOffset;
}
}
- public boolean checkIfShouldDeleted() {
- return endOffset == startOffset;
- }
-
public boolean checkIfEndOffsetDecided() {
//if the endOffset == startOffset, then the item should be deleted
return endOffset > startOffset;
}
+ public boolean checkIfLogicoffsetDecided() {
+ return logicOffset >= 0;
+ }
+
public long computeOffsetDelta() {
return logicOffset - startOffset;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e56d585..8aa1574 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -632,9 +632,6 @@ public class TopicQueueMappingUtils {
if (ignoreNegative && item.getLogicOffset() < 0) {
continue;
}
- if (!item.checkIfShouldDeleted()) {
- return mappingItems.get(i);
- }
}
return null;
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index 288bac4..eed6b76 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -165,7 +164,7 @@ public class MQAdminUtils {
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
- newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
+ newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffsetStrictly(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);