You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 09:47:50 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated (d677d85 -> 12915b8)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from d677d85  Polish doc
     new 527382e  Clean the items more than second gen
     new 12915b8  Add clean item logic for topic queue mapping

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/rocketmq/broker/BrokerController.java   |  25 ++++
 .../broker/processor/AdminBrokerProcessor.java     |   5 +-
 .../broker/topic/TopicQueueMappingManager.java     | 140 ++++++++++++++++++++-
 .../apache/rocketmq/common/admin/TopicOffset.java  |   9 ++
 .../header/GetTopicStatsInfoRequestHeader.java     |   3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  24 +++-
 .../apache/rocketmq/common/rpc/RpcResponse.java    |   2 +-
 .../statictopic/TopicQueueMappingDetail.java       |  13 ++
 .../common/statictopic/TopicQueueMappingInfo.java  |  12 ++
 .../namesrv/routeinfo/RouteInfoManager.java        |   1 +
 10 files changed, 228 insertions(+), 6 deletions(-)

[rocketmq] 02/02: Add clean item logic for topic queue mapping

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 12915b807f54c7726fddae5cd0c3e177950850c8
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 17:47:28 2021 +0800

    Add clean item logic for topic queue mapping
---
 .../apache/rocketmq/broker/BrokerController.java   | 25 ++++++
 .../broker/topic/TopicQueueMappingManager.java     | 90 ++++++++++++++++++++++
 .../apache/rocketmq/common/admin/TopicOffset.java  |  9 +++
 .../header/GetTopicStatsInfoRequestHeader.java     |  3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 24 +++++-
 .../apache/rocketmq/common/rpc/RpcResponse.java    |  2 +-
 6 files changed, 150 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9230d95..6ca46dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -163,6 +163,9 @@ public class BrokerController {
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "BrokerControllerScheduledThread"));
+    //the topic queue mapping is costly, so use an independent executor
+    private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerControllerScheduledThread-TopicQueueMapping"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -498,6 +501,22 @@ public class BrokerController {
                 }
             }, 1, 5, TimeUnit.SECONDS);
 
+            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+                try {
+                    this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
+                } catch (Throwable t) {
+                    log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
+                }
+            }, 1, 5, TimeUnit.MINUTES);
+
+            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+                try {
+                    this.topicQueueMappingManager.cleanItemExpired();
+                } catch (Throwable t) {
+                    log.error("ScheduledTask cleanItemExpired failed", t);
+                }
+            }, 1, 5, TimeUnit.MINUTES);
+
             if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                     if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
@@ -892,6 +911,12 @@ public class BrokerController {
         } catch (InterruptedException e) {
         }
 
+        this.scheduledForTopicQueueMapping.shutdown();
+        try {
+            this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        } catch (Throwable ignored) {
+        }
+
         this.unregisterBrokerAll();
 
         if (this.sendMessageExecutor != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c484bcf..9be442e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -21,6 +21,14 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,10 +40,14 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.DefaultMessageStore;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -272,4 +284,82 @@ public class TopicQueueMappingManager extends ConfigManager {
         }
     }
 
+
+    public void cleanItemExpired() {
+        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            for(String topic : topicQueueMappingTable.keySet()) {
+                TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+                if (mappingDetail == null
+                        || mappingDetail.getHostedQueues().isEmpty()) {
+                    continue;
+                }
+                if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                    log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                    continue;
+                }
+                Set<String> brokers = new HashSet<>();
+                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                    if (items.size() < 2) {
+                        continue;
+                    }
+                    LogicQueueMappingItem earlistItem = items.get(0);
+                    brokers.add(earlistItem.getBname());
+                }
+                Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                for (String broker: brokers) {
+                    GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                    header.setTopic(topic);
+                    header.setBname(broker);
+                    try {
+                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                        RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                        if (rpcResponse.getException() != null) {
+                            throw rpcResponse.getException();
+                        }
+                        statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                    } catch (Throwable rt) {
+                        log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                    }
+                }
+                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                    if (items.size() < 2) {
+                        continue;
+                    }
+                    LogicQueueMappingItem earlistItem = items.get(0);
+                    TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+                    if (topicStats == null) {
+                        continue;
+                    }
+                    TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+                    if (topicOffset == null) {
+                        //this may should not happen
+                        log.warn("Get null topicOffset for {}", earlistItem);
+                        continue;
+                    }
+                    if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+                        boolean result = items.remove(earlistItem);
+                        changed = changed || result;
+                        log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                    }
+                }
+                UtilAll.sleep(10);
+            }
+        } catch (Throwable t) {
+            log.error("Try cleanItemExpired failed", t);
+        } finally {
+            if (changed) {
+                this.dataVersion.nextVersion();
+                this.persist();
+                log.info("CleanItemExpired changed");
+            }
+            log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
+        }
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
index 7e66749..8b52a88 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
@@ -44,4 +44,13 @@ public class TopicOffset {
     public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
         this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
+
+    @Override
+    public String toString() {
+        return "TopicOffset{" +
+                "minOffset=" + minOffset +
+                ", maxOffset=" + maxOffset +
+                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+                '}';
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index c4cf4de..8e921b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,11 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicStatsInfoRequestHeader implements CommandCustomHeader {
+public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 47ffcc2..83f31e7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,8 +1,8 @@
 package org.apache.rocketmq.common.rpc;
 
-import com.alibaba.fastjson.JSON;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -83,6 +83,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.QUERY_CONSUMER_OFFSET:
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
+                case RequestCode.GET_TOPIC_STATS_INFO:
+                    rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+                    break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
             }
@@ -209,6 +212,25 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+    public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+                break;
+            }
+            default:{
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 5fcde36..2f61329 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -28,7 +28,7 @@ public class RpcResponse   {
 
     }
 
-    public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+    public RpcResponse(int code, CommandCustomHeader header, Object body) {
         this.code = code;
         this.header = header;
         this.body = body;

[rocketmq] 01/02: Clean the items more than second gen

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 527382e4ce5cc03435b199f4b3c618be5590210c
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 16:44:50 2021 +0800

    Clean the items more than second gen
---
 .../broker/processor/AdminBrokerProcessor.java     |  5 ++-
 .../broker/topic/TopicQueueMappingManager.java     | 50 +++++++++++++++++++++-
 .../statictopic/TopicQueueMappingDetail.java       | 13 ++++++
 .../common/statictopic/TopicQueueMappingInfo.java  | 12 ++++++
 .../namesrv/routeinfo/RouteInfoManager.java        |  1 +
 5 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a286818..20dfc85 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         }
 
         this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+        this.brokerController.getTopicQueueMappingManager().delete(topic);
+
         this.brokerController.getMessageStore()
             .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
             this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
         }
-        //TODO delete the topic route
-        //this.brokerController.getTopicQueueMappingManager()
+
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9be3717..c484bcf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager {
     public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
         boolean locked = false;
         boolean updated = false;
+        TopicQueueMappingDetail oldDetail = null;
         try {
             if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 locked = true;
@@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                 TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
             });
 
-            TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
+            oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
             if (oldDetail == null) {
                 topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
                 updated = true;
@@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager {
             }
             if (updated) {
                 this.persist();
+                log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
             }
         }
 
     }
 
+    public void delete(final String topic) {
+        TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic);
+        if (old != null) {
+            log.info("delete topic queue mapping OK, topic queue mapping: {}", old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            log.warn("delete topic queue mapping failed, topic: {} not exists", topic);
+        }
+    }
+
     public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
         return topicQueueMappingTable.get(topic);
     }
@@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager {
             //it is not static topic
             return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
         }
+
+        assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
+
         //If not find mappingItem, it encounters some errors
         Integer globalId = requestHeader.getQueueId();
         if (globalId < 0 && !selectOneWhenMiss) {
@@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager {
     }
 
 
+    public void cleanItemListMoreThanSecondGen() {
+        for(String topic : topicQueueMappingTable.keySet()) {
+            TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+            if (mappingDetail == null
+                    || mappingDetail.getHostedQueues().isEmpty()) {
+                continue;
+            }
+            if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                continue;
+            }
+            Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+                Integer queueId = entry.getKey();
+                List<LogicQueueMappingItem> items = entry.getValue();
+                if (items.size() <= 2) {
+                    continue;
+                }
+                LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
+                if (!leaderItem.getBname().equals(mappingDetail.getBname())
+                        && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
+                    it.remove();
+                    log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
+                }
+            }
+        }
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 1749b8e..f0f27a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
                 .append(hostedQueues)
                 .toHashCode();
     }
+
+    @Override
+    public String toString() {
+        return "TopicQueueMappingDetail{" +
+                "hostedQueues=" + hostedQueues +
+                ", topic='" + topic + '\'' +
+                ", totalQueues=" + totalQueues +
+                ", bname='" + bname + '\'' +
+                ", epoch=" + epoch +
+                ", dirty=" + dirty +
+                ", currIdMap=" + currIdMap +
+                '}';
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 53041aa..a6a7eb5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
                 .append(currIdMap)
                 .toHashCode();
     }
+
+    @Override
+    public String toString() {
+        return "TopicQueueMappingInfo{" +
+                "topic='" + topic + '\'' +
+                ", totalQueues=" + totalQueues +
+                ", bname='" + bname + '\'' +
+                ", epoch=" + epoch +
+                ", dirty=" + dirty +
+                ", currIdMap=" + currIdMap +
+                '}';
+    }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 60a0f81..a02d3f1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -169,6 +169,7 @@ public class RouteInfoManager {
                                 topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
                             }
                             //Note asset brokerName equal entry.getValue().getBname()
+                            //here use the mappingDetail.bname
                             topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                         }
                     }