You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 08:44:18 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 507553b  Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP
507553b is described below

commit 507553bd2031d5fe76c3b1cc146d4e624df16d4b
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 16:44:03 2021 +0800

    Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP
---
 .../broker/processor/AdminBrokerProcessor.java     | 52 +++++++++++++++++-----
 .../rocketmq/common/LogicQueueMappingItem.java     | 11 ++++-
 2 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0b6fa48..5f54889 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -119,6 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.filter.util.BitsArray;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -625,15 +627,46 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
             if (mappingItem == null
                     || !mappingDetail.getBname().equals(mappingItem.getBname())) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
             ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
-            //TODO should make sure the offset timestamp is equal or bigger than the searched timestamp
-            for (int i = mappingItems.size() - 1; i >=0; i--) {
+            //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
+            Long timestamp = requestHeader.getTimestamp();
+            long offset = -1;
+            for (int i = 0; i < mappingItems.size(); i++) {
+                LogicQueueMappingItem item = mappingItems.get(i);
+                if (mappingDetail.getBname().equals(item.getBname())) {
+                    //means the leader
+                    assert i ==  mappingItems.size() - 1;
+                    offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
+                    if (offset > 0) {
+                        offset = item.computeStaticQueueOffset(offset);
+                    }
+                } else {
+                    requestHeader.setPhysical(true);
+                    requestHeader.setTimestamp(timestamp);
+                    requestHeader.setQueueId(item.getQueueId());
+                    RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
+                    RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+                    if (rpcResponse.getException() != null) {
+                        throw rpcResponse.getException();
+                    }
+                    SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader();
+                    if (offsetResponseHeader.getOffset() < 0
+                            || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
+                        continue;
+                    } else {
+                        offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+                    }
 
+                }
             }
-            requestHeader.setQueueId(mappingItem.getQueueId());
-            return null;
+            final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+            final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
@@ -646,12 +679,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         final SearchOffsetRequestHeader requestHeader =
             (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
 
-        {
-            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
-            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
-            if (mappingDetail != null) {
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
 
-            }
+        RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
         }
 
         long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 8d1b164..ad0ed96 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -6,8 +6,8 @@ public class LogicQueueMappingItem {
     private int queueId;
     private String bname;
     private long logicOffset; // the start of the logic offset
-    private long startOffset; // the start of the physical offset
-    private long endOffset; // the end of the physical offset
+    private long startOffset; // the start of the physical offset, included
+    private long endOffset; // the end of the physical offset, excluded
     private long timeOfStart = -1; // mutable
     private long timeOfEnd = -1; // mutable
 
@@ -23,6 +23,13 @@ public class LogicQueueMappingItem {
     }
 
     public long computeStaticQueueOffset(long physicalQueueOffset) {
+        if (physicalQueueOffset < startOffset) {
+            return logicOffset;
+        }
+        if (endOffset >= startOffset
+            && endOffset < physicalQueueOffset) {
+            return logicOffset + (endOffset - startOffset);
+        }
         return  logicOffset + (physicalQueueOffset - startOffset);
     }