You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/06 09:41:26 UTC

[rocketmq] branch 5.0.0-alpha-static-topic created (now 3e2c920)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


      at 3e2c920  Add update_static_topic code

This branch includes the following new commits:

     new e862ac8  Add definition for logic queue
     new 3e2c920  Add update_static_topic code

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[rocketmq] 02/02: Add update_static_topic code

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3e2c9202392c9a13259662023bbcb997c9770f7f
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 6 17:39:57 2021 +0800

    Add update_static_topic code
---
 .../apache/rocketmq/broker/BrokerController.java   |  9 +++
 .../broker/processor/AdminBrokerProcessor.java     | 75 +++++++++++++---------
 .../broker/topic/TopicQueueMappingManager.java     |  9 ++-
 .../route => }/LogicQueueMappingItem.java          |  2 +-
 ...Header.java => TopicConfigAndQueueMapping.java} | 34 ++++------
 .../route => }/TopicQueueMappingInfo.java          | 14 ++--
 .../rocketmq/common/protocol/RequestCode.java      |  4 ++
 .../TopicQueueMappingBody.java}                    | 31 ++-------
 .../body/TopicQueueMappingSerializeWrapper.java    |  2 +-
 .../header/GetTopicConfigRequestHeader.java        | 10 +++
 10 files changed, 106 insertions(+), 84 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 5eb9169..27cba02 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -179,6 +180,7 @@ public class BrokerController {
     private RemotingServer remotingServer;
     private RemotingServer fastRemotingServer;
     private TopicConfigManager topicConfigManager;
+    private TopicQueueMappingManager topicQueueMappingManager;
     private ExecutorService sendMessageExecutor;
     private ExecutorService pullMessageExecutor;
     private ExecutorService ackMessageExecutor;
@@ -215,6 +217,7 @@ public class BrokerController {
         this.messageStoreConfig = messageStoreConfig;
         this.consumerOffsetManager = new ConsumerOffsetManager(this);
         this.topicConfigManager = new TopicConfigManager(this);
+        this.topicQueueMappingManager = new TopicQueueMappingManager(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
         this.pullRequestHoldService = new PullRequestHoldService(this);
         this.popMessageProcessor = new PopMessageProcessor(this);
@@ -287,6 +290,8 @@ public class BrokerController {
     public boolean initialize() throws CloneNotSupportedException {
         boolean result = this.topicConfigManager.load();
 
+        result = result && this.topicQueueMappingManager.load();
+
         result = result && this.consumerOffsetManager.load();
         result = result && this.subscriptionGroupManager.load();
         result = result && this.consumerFilterManager.load();
@@ -1184,6 +1189,10 @@ public class BrokerController {
         this.topicConfigManager = topicConfigManager;
     }
 
+    public TopicQueueMappingManager getTopicQueueMappingManager() {
+        return topicQueueMappingManager;
+    }
+
     public String getHAServerAddr() {
         return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e7b7949..99c7031 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -54,13 +54,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
-import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.TopicQueueId;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.*;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
@@ -74,28 +68,7 @@ import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
-import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
-import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
-import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
-import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
-import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
-import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
-import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.TopicList;
-import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody;
+import org.apache.rocketmq.common.protocol.body.*;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
@@ -282,6 +255,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 return migrateTopicLogicalQueueCommit(ctx, request);
             case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
                 return migrateTopicLogicalQueueNotify(ctx, request);
+            case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
+                return this.updateAndCreateStaticTopic(ctx, request);
             default:
                 return getUnknownCmdResponse(ctx, request);
         }
@@ -323,6 +298,42 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx,
+                                                              RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final CreateTopicRequestHeader requestHeader =
+                (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+        log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
+
+        String topic = requestHeader.getTopic();
+
+        if (!TopicValidator.validateTopic(topic, response)) {
+            return response;
+        }
+        if (TopicValidator.isSystemTopic(topic, response)) {
+            return response;
+        }
+
+        TopicConfig topicConfig = new TopicConfig(topic);
+        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
+        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
+        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
+        topicConfig.setPerm(requestHeader.getPerm());
+        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+
+        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+
+        this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody);
+
+        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+
+        response.setCode(ResponseCode.SUCCESS);
+        return response;
+    }
+
+
     private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1715,7 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
             return response;
         }
-        String content = JSONObject.toJSONString(topicConfig);
+        TopicQueueMappingInfo topicQueueMappingInfo = null;
+        if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
+            topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+        }
+        String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
         try {
             response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
         } catch (UnsupportedEncodingException e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9ee0f51..c885b31 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -45,7 +45,14 @@ public class TopicQueueMappingManager extends ConfigManager {
 
     public TopicQueueMappingManager(BrokerController brokerController) {
         this.brokerController = brokerController;
+    }
+
+    public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
+        topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
+    }
 
+    public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
+        return topicQueueMappingTable.get(topic);
     }
 
     @Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
similarity index 95%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
rename to common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index fc5cbe6..50d88ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
 
 public class LogicQueueMappingItem {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index ea9d17c..f9a6ab4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -14,32 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common;
 
-package org.apache.rocketmq.common.protocol.header;
+public class TopicConfigAndQueueMapping extends TopicConfig {
+    private TopicConfig topicConfig;
+    private TopicQueueMappingInfo topicQueueMappingInfo;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
+        this.topicConfig = topicConfig;
+        this.topicQueueMappingInfo = topicQueueMappingInfo;
     }
 
-    @CFNotNull
-    private String topic;
-
-    /**
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
+    public TopicQueueMappingInfo getTopicQueueMappingInfo() {
+        return topicQueueMappingInfo;
     }
 
-    /**
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public TopicConfig getTopicConfig() {
+        return topicConfig;
     }
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
similarity index 82%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
rename to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 0376965..0956a99 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -14,21 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class TopicQueueMappingInfo {
+public class TopicQueueMappingInfo extends RemotingSerializable {
 
+    private String topic; // redundant field
     private int totalQueues;
     private String bname;  //identify the host name
     //the newest mapping is in current broker
     private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
 
 
-    public TopicQueueMappingInfo(int totalQueues, String bname) {
+    public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
+        this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
     }
@@ -57,5 +61,7 @@ public class TopicQueueMappingInfo {
         return bname;
     }
 
-
+    public String getTopic() {
+        return topic;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 04f126b..f724695 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -209,4 +209,8 @@ public class RequestCode {
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
+
+
+    public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
index ea9d17c..4caba89 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
@@ -14,32 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+public class TopicQueueMappingBody extends TopicQueueMappingInfo {
 
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
+        super(topic, totalQueues, bname);
     }
-
-    @CFNotNull
-    private String topic;
-
-    /**
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
index ef3f758..1d3d6c5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.Map;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index ea9d17c..2b5d040 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -29,6 +29,8 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
+    private Boolean withMapping;
+
     /**
      * @return the topic
      */
@@ -42,4 +44,12 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
     public void setTopic(String topic) {
         this.topic = topic;
     }
+
+    public Boolean getWithMapping() {
+        return withMapping;
+    }
+
+    public void setWithMapping(Boolean withMapping) {
+        this.withMapping = withMapping;
+    }
 }
\ No newline at end of file

[rocketmq] 01/02: Add definition for logic queue

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit e862ac88fc5aae370648e6495f54476bff844e52
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 6 11:54:21 2021 +0800

    Add definition for logic queue
---
 .../rocketmq/broker/BrokerPathConfigHelper.java    |  4 +
 .../broker/topic/TopicQueueMappingManager.java     | 86 ++++++++++++++++++++++
 .../body/TopicQueueMappingSerializeWrapper.java    | 45 +++++++++++
 .../protocol/route/LogicQueueMappingItem.java      | 54 ++++++++++++++
 .../protocol/route/TopicQueueMappingInfo.java      | 61 +++++++++++++++
 5 files changed, 250 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 43a9946..e7a72e0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -35,6 +35,10 @@ public class BrokerPathConfigHelper {
         return rootDir + File.separator + "config" + File.separator + "topics.json";
     }
 
+    public static String getTopicQueueMappingPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
+    }
+
     public static String getConsumerOffsetPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
new file mode 100644
index 0000000..9ee0f51
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
+import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicQueueMappingManager extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private transient final Lock lock = new ReentrantLock();
+
+    private final DataVersion dataVersion = new DataVersion();
+    private transient BrokerController brokerController;
+
+    private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
+
+
+    public TopicQueueMappingManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+
+    }
+
+    @Override
+    public String encode(boolean pretty) {
+        TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper();
+        wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
+        wrapper.setDataVersion(this.dataVersion);
+        return JSON.toJSONString(wrapper, pretty);
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getTopicQueueMappingPath(this.brokerController.getMessageStoreConfig()
+            .getStorePathRootDir());
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TopicQueueMappingSerializeWrapper wrapper = TopicQueueMappingSerializeWrapper.fromJson(jsonString, TopicQueueMappingSerializeWrapper.class);
+            if (wrapper != null) {
+                this.topicQueueMappingTable.putAll(wrapper.getTopicQueueMappingInfoMap());
+                this.dataVersion.assignNewOne(wrapper.getDataVersion());
+            }
+        }
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
new file mode 100644
index 0000000..ef3f758
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.Map;
+
+public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
+    private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
+    private DataVersion dataVersion = new DataVersion();
+
+    public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+        return topicQueueMappingInfoMap;
+    }
+
+    public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+        this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
new file mode 100644
index 0000000..fc5cbe6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
@@ -0,0 +1,54 @@
+package org.apache.rocketmq.common.protocol.route;
+
+public class LogicQueueMappingItem {
+
+    private int gen; //generation, mutable
+    private int queueId;
+    private String bname;
+    private long logicOffset; // the start of the logic offset
+    private long startOffset; // the start of the physical offset
+    private long timeOfStart = -1; //mutable
+
+    public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
+        this.gen = gen;
+        this.queueId = queueId;
+        this.bname = bname;
+        this.logicOffset = logicOffset;
+        this.startOffset = startOffset;
+        this.timeOfStart = timeOfStart;
+    }
+
+    public int getGen() {
+        return gen;
+    }
+
+    public void setGen(int gen) {
+        this.gen = gen;
+    }
+
+
+    public long getTimeOfStart() {
+        return timeOfStart;
+    }
+
+    public void setTimeOfStart(long timeOfStart) {
+        this.timeOfStart = timeOfStart;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public String getBname() {
+        return bname;
+    }
+
+    public long getLogicOffset() {
+        return logicOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
new file mode 100644
index 0000000..0376965
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopicQueueMappingInfo {
+
+    private int totalQueues;
+    private String bname;  //identify the host name
+    //the newest mapping is in current broker
+    private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
+
+
+    public TopicQueueMappingInfo(int totalQueues, String bname) {
+        this.totalQueues = totalQueues;
+        this.bname = bname;
+    }
+
+    public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
+        if (mappingInfo.isEmpty()) {
+            return true;
+        }
+        hostedQueues.put(globalId, mappingInfo);
+        return true;
+    }
+
+    public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
+        return hostedQueues.get(globalId);
+    }
+
+    public int getTotalQueues() {
+        return totalQueues;
+    }
+
+    public void setTotalQueues(int totalQueues) {
+        this.totalQueues = totalQueues;
+    }
+
+    public String getBname() {
+        return bname;
+    }
+
+
+}