You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 08:44:18 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the
rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 507553b Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP
507553b is described below
commit 507553bd2031d5fe76c3b1cc146d4e624df16d4b
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 16:44:03 2021 +0800
Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP
---
.../broker/processor/AdminBrokerProcessor.java | 52 +++++++++++++++++-----
.../rocketmq/common/LogicQueueMappingItem.java | 11 ++++-
2 files changed, 51 insertions(+), 12 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0b6fa48..5f54889 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -119,6 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -625,15 +627,46 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
- //TODO should make sure the offset timestamp is equal or bigger than the searched timestamp
- for (int i = mappingItems.size() - 1; i >=0; i--) {
+ //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
+ Long timestamp = requestHeader.getTimestamp();
+ long offset = -1;
+ for (int i = 0; i < mappingItems.size(); i++) {
+ LogicQueueMappingItem item = mappingItems.get(i);
+ if (mappingDetail.getBname().equals(item.getBname())) {
+ //means the leader
+ assert i == mappingItems.size() - 1;
+ offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
+ if (offset > 0) {
+ offset = item.computeStaticQueueOffset(offset);
+ }
+ } else {
+ requestHeader.setPhysical(true);
+ requestHeader.setTimestamp(timestamp);
+ requestHeader.setQueueId(item.getQueueId());
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader();
+ if (offsetResponseHeader.getOffset() < 0
+ || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
+ continue;
+ } else {
+ offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+ }
+ }
}
- requestHeader.setQueueId(mappingItem.getQueueId());
- return null;
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+ final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
@@ -646,12 +679,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
- {
- TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
- TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
- if (mappingDetail != null) {
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
- }
+ RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
}
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 8d1b164..ad0ed96 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -6,8 +6,8 @@ public class LogicQueueMappingItem {
private int queueId;
private String bname;
private long logicOffset; // the start of the logic offset
- private long startOffset; // the start of the physical offset
- private long endOffset; // the end of the physical offset
+ private long startOffset; // the start of the physical offset, included
+ private long endOffset; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
@@ -23,6 +23,13 @@ public class LogicQueueMappingItem {
}
public long computeStaticQueueOffset(long physicalQueueOffset) {
+ if (physicalQueueOffset < startOffset) {
+ return logicOffset;
+ }
+ if (endOffset >= startOffset
+ && endOffset < physicalQueueOffset) {
+ return logicOffset + (endOffset - startOffset);
+ }
return logicOffset + (physicalQueueOffset - startOffset);
}