You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/07 08:46:51 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the register logic for mapping topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 7bbc7dd  Add the register logic for mapping topic
7bbc7dd is described below

commit 7bbc7ddfe92331da4386c6d1e2169325351b00fd
Author: dongeforever <do...@apache.org>
AuthorDate: Sun Nov 7 16:46:19 2021 +0800

    Add the register logic for mapping topic
---
 .../apache/rocketmq/broker/BrokerController.java   | 36 ++++++------
 .../rocketmq/broker/BrokerPathConfigHelper.java    |  2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  3 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  |  3 -
 .../broker/topic/TopicQueueMappingManager.java     |  5 ++
 .../common/TopicConfigAndQueueMapping.java         |  2 +-
 .../rocketmq/common/TopicQueueMappingInfo.java     | 64 ++++++++++++++++++++--
 .../common/protocol/body/RegisterBrokerBody.java   | 32 ++++++++++-
 .../TopicConfigAndMappingSerializeWrapper.java     | 45 +++++++++++++++
 .../protocol/body/TopicConfigSerializeWrapper.java | 10 ----
 10 files changed, 160 insertions(+), 42 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 27cba02..926c43b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -95,12 +95,14 @@ import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.stats.MomentStatsItem;
@@ -1058,7 +1060,7 @@ public class BrokerController {
             return;
         }
 
-        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
         topicConfigSerializeWrapper.setDataVersion(dataVersion);
 
         ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
@@ -1079,30 +1081,30 @@ public class BrokerController {
             .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
         topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
 
-        String brokerName = this.brokerConfig.getBrokerName();
-        Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
+        Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
             .map(TopicConfig::getTopicName)
-            .map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
-                .map(info -> {
-                    info.readLock().lock();
-                    try {
-                        return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
-                    } finally {
-                        info.readLock().unlock();
-                    }
-                })
+            .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
                 .orElse(null))
             .filter(Objects::nonNull)
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        if (!logicalQueuesInfoMap.isEmpty()) {
-            topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
+        if (!topicQueueMappingInfoMap.isEmpty()) {
+            topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
         }
 
         doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
     }
 
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
-        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+
+        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
+
+        topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
+        topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
+
+        topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
+                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
+        ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
             || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
@@ -1121,9 +1123,9 @@ public class BrokerController {
                 }
             }
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
-            topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
         }
 
+
         if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
@@ -1134,7 +1136,7 @@ public class BrokerController {
     }
 
     private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
-        TopicConfigSerializeWrapper topicConfigWrapper) {
+        TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
         List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
             this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index e7a72e0..6360d54 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -36,7 +36,7 @@ public class BrokerPathConfigHelper {
     }
 
     public static String getTopicQueueMappingPath(final String rootDir) {
-        return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
+        return rootDir + File.separator + "config" + File.separator + "topicQueueMapping.json";
     }
 
     public static String getConsumerOffsetPath(final String rootDir) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4d33663..a775864 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
@@ -140,7 +141,7 @@ public class BrokerOuterAPI {
             requestHeader.setCompressed(compressed);
 
             RegisterBrokerBody requestBody = new RegisterBrokerBody();
-            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+            requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
             requestBody.setFilterServerList(filterServerList);
             final byte[] body = requestBody.encode(compressed);
             final int bodyCrc32 = UtilAll.crc32(body);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index bf69005..3780db2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -441,8 +441,6 @@ public class TopicConfigManager extends ConfigManager {
     public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
         topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
-        String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
-        topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue(), data -> Objects.equals(data.getBrokerName(), brokerName)))));
         topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
         return topicConfigSerializeWrapper;
     }
@@ -474,7 +472,6 @@ public class TopicConfigManager extends ConfigManager {
     public String encode(final boolean prettyFormat) {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
         topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
-        topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue()))));
         topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
         return topicConfigSerializeWrapper.toJson(prettyFormat);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c885b31..da7ea8f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -37,6 +37,7 @@ public class TopicQueueMappingManager extends ConfigManager {
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private transient final Lock lock = new ReentrantLock();
 
+    //this data version should be equal to the TopicConfigManager
     private final DataVersion dataVersion = new DataVersion();
     private transient BrokerController brokerController;
 
@@ -85,6 +86,10 @@ public class TopicQueueMappingManager extends ConfigManager {
         }
     }
 
+    public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
+        return topicQueueMappingTable;
+    }
+
     public DataVersion getDataVersion() {
         return dataVersion;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index f9a6ab4..7b1ea40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.common;
 
-public class TopicConfigAndQueueMapping extends TopicConfig {
+public class TopicConfigAndQueueMapping {
     private TopicConfig topicConfig;
     private TopicQueueMappingInfo topicQueueMappingInfo;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 0956a99..85498df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -16,39 +16,91 @@
  */
 package org.apache.rocketmq.common;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class TopicQueueMappingInfo extends RemotingSerializable {
+    public static final int LEVEL_0 = 0;
+    public static final int LEVEL_1 = 1;
 
     private String topic; // redundant field
     private int totalQueues;
-    private String bname;  //identify the host name
-    //the newest mapping is in current broker
-    private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
-
+    private String bname;  //identify the hosted broker name
+    // the mapping info in current broker, do not register to nameserver
+    private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+    //register to broker to construct the route
+    private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+    //register to broker to help detect remapping failure
+    private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
         this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
+        buildIdMap();
     }
 
-    public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
+    public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
         if (mappingInfo.isEmpty()) {
             return true;
         }
         hostedQueues.put(globalId, mappingInfo);
+        buildIdMap();
         return true;
     }
 
+    public void buildIdMap() {
+        this.currIdMap = buildIdMap(LEVEL_0);
+        this.prevIdMap = buildIdMap(LEVEL_1);
+    }
+
+    public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
+        //level 0 means current leader in this broker
+        //level 1 means previous leader in this broker
+        assert level == LEVEL_0 || level == LEVEL_1;
+
+        if (hostedQueues == null || hostedQueues.isEmpty()) {
+            return new ConcurrentHashMap<Integer, Integer>();
+        }
+        ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
+        for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
+            Integer globalId =  entry.getKey();
+            ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+            if (level == LEVEL_0
+                    && items.size() >= 1) {
+                LogicQueueMappingItem curr = items.get(items.size() - 1);
+                if (bname.equals(curr.getBname())) {
+                    tmpIdMap.put(curr.getQueueId(), globalId);
+                }
+            } else if (level == LEVEL_1
+                    && items.size() >= 2) {
+                LogicQueueMappingItem prev = items.get(items.size() - 1);
+                if (bname.equals(prev.getBname())) {
+                    tmpIdMap.put(prev.getQueueId(), globalId);
+                }
+            }
+        }
+        return tmpIdMap;
+    }
+
     public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
         return hostedQueues.get(globalId);
     }
 
+
+    public TopicQueueMappingInfo  clone4register() {
+        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
+        topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
+        topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
+
+        return topicQueueMappingInfo;
+    }
+
     public int getTotalQueues() {
         return totalQueues;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 4065c08..2d7597f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -41,7 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 public class RegisterBrokerBody extends RemotingSerializable {
 
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-    private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+    private TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
     private List<String> filterServerList = new ArrayList<String>();
 
     public byte[] encode(boolean compress) {
@@ -82,6 +83,20 @@ public class RegisterBrokerBody extends RemotingSerializable {
             // write filter server list json
             outputStream.write(buffer);
 
+            //write the topic queue mapping
+            Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigSerializeWrapper.getTopicQueueMappingInfoMap();
+            if (topicQueueMappingInfoMap == null) {
+                //as the place holder
+                topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+            }
+            outputStream.write(convertIntToByteArray(topicQueueMappingInfoMap.size()));
+            for (TopicQueueMappingInfo info: topicQueueMappingInfoMap.values()) {
+                buffer = JSON.toJSONString(info).getBytes(MixAll.DEFAULT_CHARSET);
+                outputStream.write(convertIntToByteArray(buffer.length));
+                // write filter server list json
+                outputStream.write(buffer);
+            }
+
             outputStream.finish();
             long interval = System.currentTimeMillis() - start;
             if (interval > 50) {
@@ -134,6 +149,17 @@ public class RegisterBrokerBody extends RemotingSerializable {
         }
 
         registerBrokerBody.setFilterServerList(filterServerList);
+
+        int topicQueueMappingNum =  readInt(inflaterInputStream);
+        Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+        for (int i = 0; i < topicQueueMappingNum; i++) {
+            int mappingJsonLen = readInt(inflaterInputStream);
+            byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
+            TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
+            topicQueueMappingInfoMap.put(info.getTopic(), info);
+        }
+        registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
+
         long interval = System.currentTimeMillis() - start;
         if (interval > 50) {
             LOGGER.info("Decompressing takes {}ms", interval);
@@ -167,11 +193,11 @@ public class RegisterBrokerBody extends RemotingSerializable {
         return byteBuffer.getInt();
     }
 
-    public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
+    public TopicConfigAndMappingSerializeWrapper getTopicConfigSerializeWrapper() {
         return topicConfigSerializeWrapper;
     }
 
-    public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
+    public void setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper) {
         this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
new file mode 100644
index 0000000..e6a34c4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper {
+    private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+
+    public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+        return topicQueueMappingInfoMap;
+    }
+
+    public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+        this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
+    }
+
+    public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
+        if (wrapper instanceof  TopicConfigAndMappingSerializeWrapper) {
+            return (TopicConfigAndMappingSerializeWrapper)wrapper;
+        }
+        TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper =  new TopicConfigAndMappingSerializeWrapper();
+        mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
+        mappingSerializeWrapper.setTopicConfigTable(wrapper.getTopicConfigTable());
+        return mappingSerializeWrapper;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index 1389663..1176e6b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -22,13 +22,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicConfigSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>();
-    private Map<String/* topic */, LogicalQueuesInfo> logicalQueuesInfoMap;
     private DataVersion dataVersion = new DataVersion();
 
     public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
@@ -46,12 +44,4 @@ public class TopicConfigSerializeWrapper extends RemotingSerializable {
     public void setDataVersion(DataVersion dataVersion) {
         this.dataVersion = dataVersion;
     }
-
-    public Map<String, LogicalQueuesInfo> getLogicalQueuesInfoMap() {
-        return logicalQueuesInfoMap;
-    }
-
-    public void setLogicalQueuesInfoMap(Map<String, LogicalQueuesInfo> logicalQueuesInfoMap) {
-        this.logicalQueuesInfoMap = logicalQueuesInfoMap;
-    }
 }