You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/10 06:57:29 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the rewrite logic for SendProcessor

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 36a82fa  Polish the rewrite logic for SendProcessor
36a82fa is described below

commit 36a82fa67785aa4e98a2b6a6ceeda576e9cf66b7
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 10 14:57:19 2021 +0800

    Polish the rewrite logic for SendProcessor
---
 .../broker/processor/SendMessageProcessor.java     | 29 ++++++++++++++++------
 .../rocketmq/common/TopicQueueMappingDetail.java   | 23 +++++++++++++++++
 2 files changed, 45 insertions(+), 7 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 9daebf4..7b88c75 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.processor;
 
 import java.net.SocketAddress;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -126,16 +127,28 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         try {
             TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
             if (mappingDetail == null) {
+                //it is not static topic
                 return null;
             }
-            if (!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
-               return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+            Integer phyQueueId = null;
+            //compatible with the old logic, but it fact, this should not happen
+            if (requestHeader.getQueueId() < 0) {
+                Iterator<Map.Entry<Integer, Integer>> it  = mappingDetail.getCurrIdMap().entrySet().iterator();
+                if (it.hasNext()) {
+                    phyQueueId = it.next().getValue();
+                }
+            } else {
+                phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
+            }
+            if (phyQueueId == null) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+            } else {
+                requestHeader.setQueueId(phyQueueId);
+                return null;
             }
-            requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
-        return null;
     }
 
     private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) {
@@ -144,14 +157,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             if (mappingDetail == null) {
                 return null;
             }
-            if (!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+            Integer globalId = mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
+            if (globalId == null) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exist in response process of current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
             }
-            long staticLogicOffset = mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), responseHeader.getQueueOffset());
+            long staticLogicOffset = mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
 
             }
+            responseHeader.setQueueId(globalId);
             responseHeader.setQueueOffset(staticLogicOffset);
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index a181130..0021310 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -27,6 +27,9 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
     // the mapping info in current broker, do not register to nameserver
     ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+    transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/> currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
+
+
 
     public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
         super(topic, totalQueues, bname);
@@ -45,6 +48,18 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     public void buildIdMap() {
         this.currIdMap = buildIdMap(LEVEL_0);
         this.prevIdMap = buildIdMap(LEVEL_1);
+        this.currIdMapRevert = revert(this.currIdMap);
+    }
+
+    public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
+        if (original == null || original.isEmpty()) {
+            return new ConcurrentHashMap<Integer, Integer>();
+        }
+        ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
+        for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
+            tmpIdMap.put(entry.getValue(), entry.getKey());
+        }
+        return tmpIdMap;
     }
 
     public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
@@ -107,6 +122,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         return topicQueueMappingInfo;
     }
 
+    public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
+        return currIdMapRevert;
+    }
+
+    public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer> currIdMapRevert) {
+        this.currIdMapRevert = currIdMapRevert;
+    }
+
     public int getTotalQueues() {
         return totalQueues;
     }