You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/14 07:34:50 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the scope concept to logic queue

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 6a3eac9  Add the scope concept to logic queue
6a3eac9 is described below

commit 6a3eac93640acc874b4a4ecae9b8748e6c2c8a1f
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Dec 14 15:34:18 2021 +0800

    Add the scope concept to logic queue
---
 .../broker/processor/AdminBrokerProcessor.java     |  2 +-
 .../topic/TopicQueueMappingCleanService.java       |  4 +-
 .../broker/topic/TopicQueueMappingManager.java     |  3 +
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |  2 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  8 +--
 .../java/org/apache/rocketmq/common/MixAll.java    |  5 +-
 .../apache/rocketmq/common/rpc/ClientMetadata.java | 75 +++++++++++++---------
 .../common/statictopic/TopicQueueMappingInfo.java  | 43 ++++++++-----
 .../common/statictopic/TopicQueueMappingUtils.java | 17 +++++
 .../common/statictopic/TopicQueueMappingTest.java  |  2 +-
 docs/cn/statictopic/The_Scope_Of_Static_Topic.md   |  2 +-
 .../rocketmq/test/statictopic/StaticTopicIT.java   |  4 +-
 .../tools/admin/DefaultMQAdminExtTest.java         |  4 +-
 13 files changed, 108 insertions(+), 63 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2d39538..02a0060 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1078,7 +1078,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 topicOffset.setMinOffset(min);
                 topicOffset.setMaxOffset(max);
                 topicOffset.setLastUpdateTimestamp(timestamp);
-                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
+                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), qid), topicOffset);
             });
             return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
         } catch (Throwable t) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
index 8374903..9518de0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -249,13 +249,13 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                     Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
                     //fine the real leader
                     for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
-                        qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, entry.getKey())));
+                        qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), entry.getKey())));
                     }
 
                     //find the mapping detail of real leader
                     Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
                     for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
-                        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(entry.getValue())) {
+                        if (entry.getValue().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
                             continue;
                         }
                         String broker = entry.getValue();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 6965053..1633440 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -109,6 +109,9 @@ public class TopicQueueMappingManager extends ConfigManager {
             if (newDetail.getEpoch() < oldDetail.getEpoch()) {
                 throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
             }
+            if (!newDetail.getScope().equals(oldDetail.getScope())) {
+                throw new RuntimeException(String.format("Can't accept data with unmatched scope %s != %s", newDetail.getScope(), oldDetail.getScope()));
+            }
             boolean epochEqual = newDetail.getEpoch() == oldDetail.getEpoch();
             for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
                 List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 53f8153..6d47573 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -576,7 +576,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             String destBrokerName = brokerName;
-            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(destBrokerName)) {
+            if (destBrokerName != null && destBrokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
                 destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPullConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
             }
             String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 6365f96..2b504cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -723,8 +723,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
-            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)
-                || (mq != null && MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName()))) {
+            if ((brokerName != null && brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX))
+                || (mq != null && mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX))) {
                 sendMessageBackAsNormalMessage(msg);
             } else {
                 String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
@@ -768,7 +768,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             String topic = message.getTopic();
 
             String desBrokerName = brokerName;
-            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+            if (brokerName != null && brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
                 desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
             }
 
@@ -816,7 +816,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
 
         String desBrokerName = brokerName;
-        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) {
+        if (brokerName != null && brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
             desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
         }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 58928ea..cc12077 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -85,8 +85,9 @@ public class MixAll {
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logic_broker__";
-    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logic_broker_none__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
+    public static final String METADATA_SCOPE_GLOBAL = "__global__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__syslo__none__";
 
     public static String getWSAddr() {
         String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 078f616..4554557 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -100,43 +101,57 @@ public class ClientMetadata {
                 || route.getTopicQueueMappingByBroker().isEmpty()) {
             return new ConcurrentHashMap<MessageQueue, String>();
         }
-        ConcurrentMap<MessageQueue, TopicQueueMappingInfo> mqEndPoints = new ConcurrentHashMap<MessageQueue, TopicQueueMappingInfo>();
-
-
-        List<Map.Entry<String, TopicQueueMappingInfo>> mappingInfos = new ArrayList<Map.Entry<String, TopicQueueMappingInfo>>(route.getTopicQueueMappingByBroker().entrySet());
-        Collections.sort(mappingInfos, new Comparator<Map.Entry<String, TopicQueueMappingInfo>>() {
-            @Override
-            public int compare(Map.Entry<String, TopicQueueMappingInfo> o1, Map.Entry<String, TopicQueueMappingInfo> o2) {
-                return  (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch());
-            }
-        });
+        ConcurrentMap<MessageQueue, String> mqEndPointsOfBroker = new ConcurrentHashMap<MessageQueue, String>();
 
-        int maxTotalNums = 0;
-        long maxTotalNumOfEpoch = -1;
-        for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
+        Map<String, Map<String, TopicQueueMappingInfo>> mappingInfosByScope = new HashMap<String, Map<String, TopicQueueMappingInfo>>();
+        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
             TopicQueueMappingInfo info = entry.getValue();
-            if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
-                maxTotalNums = info.getTotalQueues();
-            }
-            for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
-                int globalId = idEntry.getKey();
-                MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
-                TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq);
-                if (oldInfo == null ||  oldInfo.getEpoch() <= info.getEpoch()) {
-                    mqEndPoints.put(mq, info);
+            String scope = info.getScope();
+            if (scope != null) {
+                if (!mappingInfosByScope.containsKey(scope)) {
+                    mappingInfosByScope.put(scope, new HashMap<String, TopicQueueMappingInfo>());
                 }
+                mappingInfosByScope.get(scope).put(entry.getKey(), entry.getValue());
             }
         }
 
-        ConcurrentMap<MessageQueue, String> mqEndPointsOfBroker = new ConcurrentHashMap<MessageQueue, String>();
+        for (Map.Entry<String, Map<String, TopicQueueMappingInfo>> mapEntry : mappingInfosByScope.entrySet()) {
+            String scope = mapEntry.getKey();
+            Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap =  mapEntry.getValue();
+            ConcurrentMap<MessageQueue, TopicQueueMappingInfo> mqEndPoints = new ConcurrentHashMap<MessageQueue, TopicQueueMappingInfo>();
+            List<Map.Entry<String, TopicQueueMappingInfo>> mappingInfos = new ArrayList<Map.Entry<String, TopicQueueMappingInfo>>(topicQueueMappingInfoMap.entrySet());
+            Collections.sort(mappingInfos, new Comparator<Map.Entry<String, TopicQueueMappingInfo>>() {
+                @Override
+                public int compare(Map.Entry<String, TopicQueueMappingInfo> o1, Map.Entry<String, TopicQueueMappingInfo> o2) {
+                    return  (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch());
+                }
+            });
+            int maxTotalNums = 0;
+            long maxTotalNumOfEpoch = -1;
+            for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
+                TopicQueueMappingInfo info = entry.getValue();
+                if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
+                    maxTotalNums = info.getTotalQueues();
+                }
+                for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+                    int globalId = idEntry.getKey();
+                    MessageQueue mq = new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(info.getScope()), globalId);
+                    TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq);
+                    if (oldInfo == null ||  oldInfo.getEpoch() <= info.getEpoch()) {
+                        mqEndPoints.put(mq, info);
+                    }
+                }
+            }
+
 
-        //accomplish the static logic queues
-        for (int i = 0; i < maxTotalNums; i++) {
-            MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
-            if (!mqEndPoints.containsKey(mq)) {
-                mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
-            } else {
-                mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname());
+            //accomplish the static logic queues
+            for (int i = 0; i < maxTotalNums; i++) {
+                MessageQueue mq = new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(scope), i);
+                if (!mqEndPoints.containsKey(mq)) {
+                    mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+                } else {
+                    mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname());
+                }
             }
         }
         return mqEndPointsOfBroker;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index a6a7eb5..e9cf6f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.common.statictopic;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,6 +28,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     public static final int LEVEL_0 = 0;
 
     String topic; // redundant field
+    String scope = MixAll.METADATA_SCOPE_GLOBAL;
     int totalQueues;
     String bname;  //identify the hosted broker name
     long epoch; //important to fence the old dirty data
@@ -95,40 +97,47 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
         this.currIdMap = currIdMap;
     }
 
+    public String getScope() {
+        return scope;
+    }
+
+    public void setScope(String scope) {
+        this.scope = scope;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-
         if (!(o instanceof TopicQueueMappingInfo)) return false;
 
         TopicQueueMappingInfo info = (TopicQueueMappingInfo) o;
 
-        return new EqualsBuilder()
-                .append(totalQueues, info.totalQueues)
-                .append(epoch, info.epoch)
-                .append(dirty, info.dirty)
-                .append(topic, info.topic)
-                .append(bname, info.bname)
-                .append(currIdMap, info.currIdMap)
-                .isEquals();
+        if (totalQueues != info.totalQueues) return false;
+        if (epoch != info.epoch) return false;
+        if (dirty != info.dirty) return false;
+        if (topic != null ? !topic.equals(info.topic) : info.topic != null) return false;
+        if (scope != null ? !scope.equals(info.scope) : info.scope != null) return false;
+        if (bname != null ? !bname.equals(info.bname) : info.bname != null) return false;
+        return currIdMap != null ? currIdMap.equals(info.currIdMap) : info.currIdMap == null;
     }
 
     @Override
     public int hashCode() {
-        return new HashCodeBuilder(17, 37)
-                .append(topic)
-                .append(totalQueues)
-                .append(bname)
-                .append(epoch)
-                .append(dirty)
-                .append(currIdMap)
-                .toHashCode();
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (scope != null ? scope.hashCode() : 0);
+        result = 31 * result + totalQueues;
+        result = 31 * result + (bname != null ? bname.hashCode() : 0);
+        result = 31 * result + (int) (epoch ^ (epoch >>> 32));
+        result = 31 * result + (dirty ? 1 : 0);
+        result = 31 * result + (currIdMap != null ? currIdMap.hashCode() : 0);
+        return result;
     }
 
     @Override
     public String toString() {
         return "TopicQueueMappingInfo{" +
                 "topic='" + topic + '\'' +
+                ", scope='" + scope + '\'' +
                 ", totalQueues=" + totalQueues +
                 ", bname='" + bname + '\'' +
                 ", epoch=" + epoch +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 5c564a3..e7e7817 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -155,6 +155,7 @@ public class TopicQueueMappingUtils {
         //make sure it it not null
         long maxEpoch = -1;
         int maxNum = -1;
+        String scope = null;
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
             String broker = entry.getKey();
             TopicConfigAndQueueMapping configMapping = entry.getValue();
@@ -176,6 +177,13 @@ public class TopicQueueMappingUtils {
                 throw new RuntimeException("The topic name is not match for broker  " + broker);
             }
 
+            if (scope != null
+                && !scope.equals(mappingDetail.getScope())) {
+                throw new RuntimeException(String.format("scope dose not match %s != %s in %s", mappingDetail.getScope(), scope, broker));
+            } else {
+                scope = mappingDetail.getScope();
+            }
+
             if (maxEpoch != -1
                 && maxEpoch != mappingDetail.getEpoch()) {
                 throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname()));
@@ -193,6 +201,15 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
+    public static String getMockBrokerName(String scope) {
+        assert scope != null;
+        if (scope.equals(MixAll.METADATA_SCOPE_GLOBAL)) {
+            return MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX + scope.substring(2);
+        } else {
+            return MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX + scope;
+        }
+    }
+
     public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual, boolean isCLean) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index b4a8dda..f1a2b21 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -43,7 +43,7 @@ public class TopicQueueMappingTest {
         {
             Map  mappingDetailMap = JSON.parseObject(mappingDetailJson);
             Assert.assertTrue(mappingDetailMap.containsKey("currIdMap"));
-            Assert.assertEquals(7, mappingDetailMap.size());
+            Assert.assertEquals(8, mappingDetailMap.size());
             Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
             Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
         }
diff --git a/docs/cn/statictopic/The_Scope_Of_Static_Topic.md b/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
index 56999e4..9c9973c 100644
--- a/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
+++ b/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
@@ -90,7 +90,7 @@ RocketMQ 多个集群的元数据可以无缝在Nameserver处汇聚,同时又
 Static Topic 实现 单集群固定 和 全网固定 两种Scope,以适配『全球容灾集群』。
 多集群,暂时没有必要。
 
-一期只实现 单集群固定 这个Scope。
+一期只实现 全网固定 这个Scope,但在格式上注意兼容
 
 #### SOT 增加 Scope 字段
 ```
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 38811f6..bc7cf25 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -148,7 +148,7 @@ public class StaticTopicIT extends BaseConf {
         for (int i = 0; i < queueNum; i++) {
             MessageQueue messageQueue = messageQueueList.get(i);
             Assert.assertEquals(topic, messageQueue.getTopic());
-            Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName(MixAll.METADATA_SCOPE_GLOBAL), messageQueue.getBrokerName());
             Assert.assertEquals(i, messageQueue.getQueueId());
             String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
             Assert.assertTrue(targetBrokers.contains(destBrokerName));
@@ -211,7 +211,7 @@ public class StaticTopicIT extends BaseConf {
                 MessageExt messageExt = messageExts.get(j);
                 int currGen = startGen + j / msgEachQueue;
                 Assert.assertEquals(topic, messageExt.getTopic());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName());
+                Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName(MixAll.METADATA_SCOPE_GLOBAL), messageExt.getBrokerName());
                 Assert.assertEquals(i, messageExt.getQueueId());
                 Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
             }
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 75ec124..deb3d05 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -455,14 +455,14 @@ public class DefaultMQAdminExtTest {
     public void testMaxOffset_LogicalQueue() throws Exception {
         when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L);
 
-        assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0))).isEqualTo(1010L);
+        assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0))).isEqualTo(1010L);
     }
 
     @Test
     public void testSearchOffset_LogicalQueue() throws Exception {
         when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L);
 
-        assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0), System.currentTimeMillis())).isEqualTo(1011L);
+        assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0), System.currentTimeMillis())).isEqualTo(1011L);
     }
 
     @Test