You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 12:27:53 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated (12915b8 -> c06564f)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from 12915b8  Add clean item logic for topic queue mapping
     new 9cae8c1  Add decode encode test for topic queue manager
     new c06564f  Try polishing the clear logic, need more polishment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/topic/TopicQueueMappingManager.java     | 164 ++++++++++++++-------
 .../broker/topic/TopicQueueMappingManagerTest.java | 112 ++++++++++++++
 .../header/GetTopicConfigRequestHeader.java        |   3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  18 ++-
 4 files changed, 236 insertions(+), 61 deletions(-)
 create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java

[rocketmq] 02/02: Try polishing the clear logic, need more polishment

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c06564f68b2eb90b2774c736241faa15ac924c85
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 20:27:16 2021 +0800

    Try polishing the clear logic, need more polishment
---
 .../broker/topic/TopicQueueMappingManager.java     | 161 ++++++++++++++-------
 .../header/GetTopicConfigRequestHeader.java        |   3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  18 ++-
 3 files changed, 121 insertions(+), 61 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1c11fde..c442040 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager {
 
 
     public void cleanItemListMoreThanSecondGen() {
-        for(String topic : topicQueueMappingTable.keySet()) {
-            TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
-            if (mappingDetail == null
-                    || mappingDetail.getHostedQueues().isEmpty()) {
-                continue;
-            }
-            if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
-                log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
-                continue;
-            }
-            Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
-                Integer queueId = entry.getKey();
-                List<LogicQueueMappingItem> items = entry.getValue();
-                if (items.size() <= 2) {
-                    continue;
-                }
-                LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
-                LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
-                if (!leaderItem.getBname().equals(mappingDetail.getBname())
-                        && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
-                    it.remove();
-                    log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
-                }
-            }
-        }
-    }
-
-
-    public void cleanItemExpired() {
         String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
         if (!UtilAll.isItTimeToDo(when)) {
             return;
         }
-        boolean changed = false;
-        long start = System.currentTimeMillis();
-        try {
-            for(String topic : topicQueueMappingTable.keySet()) {
+
+        for(String topic : topicQueueMappingTable.keySet()) {
+            try {
                 TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
                 if (mappingDetail == null
                         || mappingDetail.getHostedQueues().isEmpty()) {
@@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager {
                     continue;
                 }
                 Set<String> brokers = new HashSet<>();
-                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
                     if (items.size() < 2) {
                         continue;
                     }
-                    LogicQueueMappingItem earlistItem = items.get(0);
-                    brokers.add(earlistItem.getBname());
+                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                    if (!leaderItem.equals(mappingDetail.getBname())) {
+                        brokers.add(leaderItem.getBname());
+                    }
                 }
-                Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                if (brokers.isEmpty()) {
+                    continue;
+                }
+                Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
                 for (String broker: brokers) {
-                    GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                    GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
                     header.setTopic(topic);
                     header.setBname(broker);
                     try {
-                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
                         RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
                         if (rpcResponse.getException() != null) {
                             throw rpcResponse.getException();
                         }
-                        statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                        configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
                     } catch (Throwable rt) {
                         log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                     }
                 }
-                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+
+                Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+                while (it.hasNext()) {
+                    Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+                    Integer queueId = entry.getKey();
+                    List<LogicQueueMappingItem> items = entry.getValue();
                     if (items.size() < 2) {
                         continue;
                     }
-                    LogicQueueMappingItem earlistItem = items.get(0);
-                    TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
-                    if (topicStats == null) {
+                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+
+                    TopicConfigAndQueueMapping configAndQueueMapping =  configAndQueueMappingMap.get(leaderItem.getBname());
+                    if (configAndQueueMapping == null) {
                         continue;
                     }
-                    TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
-                    if (topicOffset == null) {
-                        //this may should not happen
-                        log.warn("Get null topicOffset for {}", earlistItem);
+                    List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
+                    //TODO
+                }
+            } catch (Throwable tt) {
+                log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
+            } finally {
+                UtilAll.sleep(10);
+            }
+        }
+    }
+
+
+    public void cleanItemExpired() {
+        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            for(String topic : topicQueueMappingTable.keySet()) {
+                try {
+                    TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+                    if (mappingDetail == null
+                            || mappingDetail.getHostedQueues().isEmpty()) {
+                        continue;
+                    }
+                    if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
                         continue;
                     }
-                    if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
-                        boolean result = items.remove(earlistItem);
-                        changed = changed || result;
-                        log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                    Set<String> brokers = new HashSet<>();
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        brokers.add(earlistItem.getBname());
+                    }
+                    Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                    for (String broker: brokers) {
+                        GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                        header.setTopic(topic);
+                        header.setBname(broker);
+                        try {
+                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                            if (rpcResponse.getException() != null) {
+                                throw rpcResponse.getException();
+                            }
+                            statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                        } catch (Throwable rt) {
+                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                        }
+                    }
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+                        if (topicStats == null) {
+                            continue;
+                        }
+                        TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+                        if (topicOffset == null) {
+                            //this may should not happen
+                            log.warn("Get null topicOffset for {}", earlistItem);
+                            continue;
+                        }
+                        if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+                            //TODO be careful of the concurrent problem
+                            //Should use the lock
+                            boolean result = items.remove(earlistItem);
+                            changed = changed || result;
+                            log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                        }
                     }
+                } catch (Throwable tt) {
+                    log.error("Try CleanItemExpired failed for {}", topic, tt);
+                } finally {
+                    UtilAll.sleep(10);
                 }
-                UtilAll.sleep(10);
             }
         } catch (Throwable t) {
             log.error("Try cleanItemExpired failed", t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index 2b5d040..b282efa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -17,11 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
+public class GetTopicConfigRequestHeader extends RpcRequestHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 83f31e7..6d75df9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc;
 
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient {
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
                 case RequestCode.GET_TOPIC_STATS_INFO:
-                    rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+                    rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
+                    break;
+                case RequestCode.GET_TOPIC_CONFIG:
+                    rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicConfigAndQueueMapping.class);
                     break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
@@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
-    public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+    public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
-
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
         switch (responseCommand.getCode()) {
             case ResponseCode.SUCCESS: {
-                TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
-                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
                 break;
             }
             default:{

[rocketmq] 01/02: Add decode encode test for topic queue manager

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9cae8c1bf02afa5788435ea3360aeb241443515c
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 19:50:28 2021 +0800

    Add decode encode test for topic queue manager
---
 .../broker/topic/TopicQueueMappingManager.java     |   3 +
 .../broker/topic/TopicQueueMappingManagerTest.java | 112 +++++++++++++++++++++
 2 files changed, 115 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9be442e..1c11fde 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -77,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
         boolean updated = false;
         TopicQueueMappingDetail oldDetail = null;
         try {
+
             if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 locked = true;
             } else {
@@ -85,6 +86,8 @@ public class TopicQueueMappingManager extends ConfigManager {
             if (newDetail == null) {
                 return;
             }
+            assert newDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
+
             newDetail.getHostedQueues().forEach((queueId, items) -> {
                 TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
             });
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
new file mode 100644
index 0000000..3c3f488
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TopicQueueMappingManagerTest {
+    @Mock
+    private BrokerController brokerController;
+    private static final String broker1Name = "broker1";
+
+    @Before
+    public void before() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setBrokerName(broker1Name);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir"));
+        messageStoreConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
+        when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+    }
+
+
+    private void delete(TopicQueueMappingManager topicQueueMappingManager) throws Exception {
+        if (topicQueueMappingManager == null) {
+            return;
+        }
+        Files.deleteIfExists(Paths.get(topicQueueMappingManager.configFilePath()));
+        Files.deleteIfExists(Paths.get(topicQueueMappingManager.configFilePath() + ".bak"));
+
+
+    }
+
+    @Test
+    public void testEncodeDecode() throws Exception {
+        Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
+        TopicQueueMappingManager topicQueueMappingManager = null;
+        Set<String> brokers = new HashSet<String>();
+        brokers.add(broker1Name);
+        {
+            for (int i = 0; i < 10; i++) {
+                String topic = UUID.randomUUID().toString();
+                int queueNum = 10;
+                TopicRemappingDetailWrapper topicRemappingDetailWrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new HashMap<>());
+                Assert.assertEquals(1, topicRemappingDetailWrapper.getBrokerConfigMap().size());
+                TopicQueueMappingDetail topicQueueMappingDetail  = topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail();
+                Assert.assertEquals(queueNum, topicQueueMappingDetail.getHostedQueues().size());
+                mappingDetailMap.put(topic, topicQueueMappingDetail);
+            }
+        }
+
+        {
+            topicQueueMappingManager = new TopicQueueMappingManager(brokerController);
+            Assert.assertTrue(topicQueueMappingManager.load());
+            Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size());
+            for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) {
+                for (int i = 0; i < 10; i++) {
+                    topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false);
+                }
+            }
+            topicQueueMappingManager.persist();
+        }
+
+        {
+            topicQueueMappingManager = new TopicQueueMappingManager(brokerController);
+            Assert.assertTrue(topicQueueMappingManager.load());
+            Assert.assertEquals(mappingDetailMap.size(), topicQueueMappingManager.getTopicQueueMappingTable().size());
+            for (TopicQueueMappingDetail topicQueueMappingDetail: topicQueueMappingManager.getTopicQueueMappingTable().values()) {
+                Assert.assertEquals(topicQueueMappingDetail, mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
+            }
+        }
+        delete(topicQueueMappingManager);
+    }
+}
\ No newline at end of file