You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/07 08:46:51 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the
register logic for mapping topic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 7bbc7dd Add the register logic for mapping topic
7bbc7dd is described below
commit 7bbc7ddfe92331da4386c6d1e2169325351b00fd
Author: dongeforever <do...@apache.org>
AuthorDate: Sun Nov 7 16:46:19 2021 +0800
Add the register logic for mapping topic
---
.../apache/rocketmq/broker/BrokerController.java | 36 ++++++------
.../rocketmq/broker/BrokerPathConfigHelper.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 3 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 3 -
.../broker/topic/TopicQueueMappingManager.java | 5 ++
.../common/TopicConfigAndQueueMapping.java | 2 +-
.../rocketmq/common/TopicQueueMappingInfo.java | 64 ++++++++++++++++++++--
.../common/protocol/body/RegisterBrokerBody.java | 32 ++++++++++-
.../TopicConfigAndMappingSerializeWrapper.java | 45 +++++++++++++++
.../protocol/body/TopicConfigSerializeWrapper.java | 10 ----
10 files changed, 160 insertions(+), 42 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 27cba02..926c43b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -95,12 +95,14 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.stats.MomentStatsItem;
@@ -1058,7 +1060,7 @@ public class BrokerController {
return;
}
- TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
@@ -1079,30 +1081,30 @@ public class BrokerController {
.collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
- String brokerName = this.brokerConfig.getBrokerName();
- Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
- .map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
- .map(info -> {
- info.readLock().lock();
- try {
- return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
- } finally {
- info.readLock().unlock();
- }
- })
+ .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+ .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.clone4register()))
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- if (!logicalQueuesInfoMap.isEmpty()) {
- topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
+ if (!topicQueueMappingInfoMap.isEmpty()) {
+ topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
}
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
- TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
+
+ topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
+ topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
+
+ topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
+ entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().clone4register())
+ ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
@@ -1121,9 +1123,9 @@ public class BrokerController {
}
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
- topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}
+
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
@@ -1134,7 +1136,7 @@ public class BrokerController {
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
- TopicConfigSerializeWrapper topicConfigWrapper) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index e7a72e0..6360d54 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -36,7 +36,7 @@ public class BrokerPathConfigHelper {
}
public static String getTopicQueueMappingPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
+ return rootDir + File.separator + "config" + File.separator + "topicQueueMapping.json";
}
public static String getConsumerOffsetPath(final String rootDir) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4d33663..a775864 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
@@ -140,7 +141,7 @@ public class BrokerOuterAPI {
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
- requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+ requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index bf69005..3780db2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -441,8 +441,6 @@ public class TopicConfigManager extends ConfigManager {
public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
- String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
- topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue(), data -> Objects.equals(data.getBrokerName(), brokerName)))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper;
}
@@ -474,7 +472,6 @@ public class TopicConfigManager extends ConfigManager {
public String encode(final boolean prettyFormat) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
- topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue()))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper.toJson(prettyFormat);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c885b31..da7ea8f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -37,6 +37,7 @@ public class TopicQueueMappingManager extends ConfigManager {
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lock = new ReentrantLock();
+ //this data version should be equal to the TopicConfigManager
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
@@ -85,6 +86,10 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
+ public ConcurrentMap<String, TopicQueueMappingInfo> getTopicQueueMappingTable() {
+ return topicQueueMappingTable;
+ }
+
public DataVersion getDataVersion() {
return dataVersion;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index f9a6ab4..7b1ea40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.common;
-public class TopicConfigAndQueueMapping extends TopicConfig {
+public class TopicConfigAndQueueMapping {
private TopicConfig topicConfig;
private TopicQueueMappingInfo topicQueueMappingInfo;
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 0956a99..85498df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -16,39 +16,91 @@
*/
package org.apache.rocketmq.common;
+import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingInfo extends RemotingSerializable {
+ public static final int LEVEL_0 = 0;
+ public static final int LEVEL_1 = 1;
private String topic; // redundant field
private int totalQueues;
- private String bname; //identify the host name
- //the newest mapping is in current broker
- private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
-
+ private String bname; //identify the hosted broker name
+ // the mapping info in current broker, do not register to nameserver
+ private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ //register to broker to construct the route
+ private ConcurrentMap<Integer, Integer> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+ //register to broker to help detect remapping failure
+ private ConcurrentMap<Integer, Integer> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
+ buildIdMap();
}
- public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
+ public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
hostedQueues.put(globalId, mappingInfo);
+ buildIdMap();
return true;
}
+ public void buildIdMap() {
+ this.currIdMap = buildIdMap(LEVEL_0);
+ this.prevIdMap = buildIdMap(LEVEL_1);
+ }
+
+ public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
+ //level 0 means current leader in this broker
+ //level 1 means previous leader in this broker
+ assert level == LEVEL_0 || level == LEVEL_1;
+
+ if (hostedQueues == null || hostedQueues.isEmpty()) {
+ return new ConcurrentHashMap<Integer, Integer>();
+ }
+ ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
+ for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
+ Integer globalId = entry.getKey();
+ ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+ if (level == LEVEL_0
+ && items.size() >= 1) {
+ LogicQueueMappingItem curr = items.get(items.size() - 1);
+ if (bname.equals(curr.getBname())) {
+ tmpIdMap.put(curr.getQueueId(), globalId);
+ }
+ } else if (level == LEVEL_1
+ && items.size() >= 2) {
+ LogicQueueMappingItem prev = items.get(items.size() - 1);
+ if (bname.equals(prev.getBname())) {
+ tmpIdMap.put(prev.getQueueId(), globalId);
+ }
+ }
+ }
+ return tmpIdMap;
+ }
+
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
+
+ public TopicQueueMappingInfo clone4register() {
+ TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
+ topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
+ topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
+
+ return topicQueueMappingInfo;
+ }
+
public int getTotalQueues() {
return totalQueues;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 4065c08..2d7597f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -33,6 +33,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -41,7 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class RegisterBrokerBody extends RemotingSerializable {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ private TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();
public byte[] encode(boolean compress) {
@@ -82,6 +83,20 @@ public class RegisterBrokerBody extends RemotingSerializable {
// write filter server list json
outputStream.write(buffer);
+ //write the topic queue mapping
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigSerializeWrapper.getTopicQueueMappingInfoMap();
+ if (topicQueueMappingInfoMap == null) {
+ //as the place holder
+ topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+ }
+ outputStream.write(convertIntToByteArray(topicQueueMappingInfoMap.size()));
+ for (TopicQueueMappingInfo info: topicQueueMappingInfoMap.values()) {
+ buffer = JSON.toJSONString(info).getBytes(MixAll.DEFAULT_CHARSET);
+ outputStream.write(convertIntToByteArray(buffer.length));
+ // write filter server list json
+ outputStream.write(buffer);
+ }
+
outputStream.finish();
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
@@ -134,6 +149,17 @@ public class RegisterBrokerBody extends RemotingSerializable {
}
registerBrokerBody.setFilterServerList(filterServerList);
+
+ int topicQueueMappingNum = readInt(inflaterInputStream);
+ Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+ for (int i = 0; i < topicQueueMappingNum; i++) {
+ int mappingJsonLen = readInt(inflaterInputStream);
+ byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
+ TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
+ topicQueueMappingInfoMap.put(info.getTopic(), info);
+ }
+ registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
+
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Decompressing takes {}ms", interval);
@@ -167,11 +193,11 @@ public class RegisterBrokerBody extends RemotingSerializable {
return byteBuffer.getInt();
}
- public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
+ public TopicConfigAndMappingSerializeWrapper getTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}
- public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
+ public void setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper) {
this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
new file mode 100644
index 0000000..e6a34c4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper {
+ private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
+
+ public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+ return topicQueueMappingInfoMap;
+ }
+
+ public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+ this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
+ }
+
+ public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
+ if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
+ return (TopicConfigAndMappingSerializeWrapper)wrapper;
+ }
+ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
+ mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
+ mappingSerializeWrapper.setTopicConfigTable(wrapper.getTopicConfigTable());
+ return mappingSerializeWrapper;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index 1389663..1176e6b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -22,13 +22,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
- private Map<String/* topic */, LogicalQueuesInfo> logicalQueuesInfoMap;
private DataVersion dataVersion = new DataVersion();
public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
@@ -46,12 +44,4 @@ public class TopicConfigSerializeWrapper extends RemotingSerializable {
public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}
-
- public Map<String, LogicalQueuesInfo> getLogicalQueuesInfoMap() {
- return logicalQueuesInfoMap;
- }
-
- public void setLogicalQueuesInfoMap(Map<String, LogicalQueuesInfo> logicalQueuesInfoMap) {
- this.logicalQueuesInfoMap = logicalQueuesInfoMap;
- }
}