You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/06 09:41:28 UTC

[rocketmq] 02/02: Add update_static_topic code

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3e2c9202392c9a13259662023bbcb997c9770f7f
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 6 17:39:57 2021 +0800

    Add update_static_topic code
---
 .../apache/rocketmq/broker/BrokerController.java   |  9 +++
 .../broker/processor/AdminBrokerProcessor.java     | 75 +++++++++++++---------
 .../broker/topic/TopicQueueMappingManager.java     |  9 ++-
 .../route => }/LogicQueueMappingItem.java          |  2 +-
 ...Header.java => TopicConfigAndQueueMapping.java} | 34 ++++------
 .../route => }/TopicQueueMappingInfo.java          | 14 ++--
 .../rocketmq/common/protocol/RequestCode.java      |  4 ++
 .../TopicQueueMappingBody.java}                    | 31 ++-------
 .../body/TopicQueueMappingSerializeWrapper.java    |  2 +-
 .../header/GetTopicConfigRequestHeader.java        | 10 +++
 10 files changed, 106 insertions(+), 84 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 5eb9169..27cba02 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -179,6 +180,7 @@ public class BrokerController {
     private RemotingServer remotingServer;
     private RemotingServer fastRemotingServer;
     private TopicConfigManager topicConfigManager;
+    private TopicQueueMappingManager topicQueueMappingManager;
     private ExecutorService sendMessageExecutor;
     private ExecutorService pullMessageExecutor;
     private ExecutorService ackMessageExecutor;
@@ -215,6 +217,7 @@ public class BrokerController {
         this.messageStoreConfig = messageStoreConfig;
         this.consumerOffsetManager = new ConsumerOffsetManager(this);
         this.topicConfigManager = new TopicConfigManager(this);
+        this.topicQueueMappingManager = new TopicQueueMappingManager(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
         this.pullRequestHoldService = new PullRequestHoldService(this);
         this.popMessageProcessor = new PopMessageProcessor(this);
@@ -287,6 +290,8 @@ public class BrokerController {
     public boolean initialize() throws CloneNotSupportedException {
         boolean result = this.topicConfigManager.load();
 
+        result = result && this.topicQueueMappingManager.load();
+
         result = result && this.consumerOffsetManager.load();
         result = result && this.subscriptionGroupManager.load();
         result = result && this.consumerFilterManager.load();
@@ -1184,6 +1189,10 @@ public class BrokerController {
         this.topicConfigManager = topicConfigManager;
     }
 
+    public TopicQueueMappingManager getTopicQueueMappingManager() {
+        return topicQueueMappingManager;
+    }
+
     public String getHAServerAddr() {
         return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e7b7949..99c7031 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -54,13 +54,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
-import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.TopicQueueId;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.*;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
@@ -74,28 +68,7 @@ import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
-import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
-import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
-import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
-import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
-import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
-import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
-import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.TopicList;
-import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody;
+import org.apache.rocketmq.common.protocol.body.*;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
@@ -282,6 +255,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 return migrateTopicLogicalQueueCommit(ctx, request);
             case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
                 return migrateTopicLogicalQueueNotify(ctx, request);
+            case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
+                return this.updateAndCreateStaticTopic(ctx, request);
             default:
                 return getUnknownCmdResponse(ctx, request);
         }
@@ -323,6 +298,42 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx,
+                                                              RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final CreateTopicRequestHeader requestHeader =
+                (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+        log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
+
+        String topic = requestHeader.getTopic();
+
+        if (!TopicValidator.validateTopic(topic, response)) {
+            return response;
+        }
+        if (TopicValidator.isSystemTopic(topic, response)) {
+            return response;
+        }
+
+        TopicConfig topicConfig = new TopicConfig(topic);
+        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
+        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
+        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
+        topicConfig.setPerm(requestHeader.getPerm());
+        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+
+        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+
+        this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody);
+
+        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+
+        response.setCode(ResponseCode.SUCCESS);
+        return response;
+    }
+
+
     private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1715,7 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
             return response;
         }
-        String content = JSONObject.toJSONString(topicConfig);
+        TopicQueueMappingInfo topicQueueMappingInfo = null;
+        if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
+            topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+        }
+        String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
         try {
             response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
         } catch (UnsupportedEncodingException e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9ee0f51..c885b31 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -45,7 +45,14 @@ public class TopicQueueMappingManager extends ConfigManager {
 
     public TopicQueueMappingManager(BrokerController brokerController) {
         this.brokerController = brokerController;
+    }
+
+    public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
+        topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
+    }
 
+    public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
+        return topicQueueMappingTable.get(topic);
     }
 
     @Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
similarity index 95%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
rename to common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index fc5cbe6..50d88ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
 
 public class LogicQueueMappingItem {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index ea9d17c..f9a6ab4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -14,32 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common;
 
-package org.apache.rocketmq.common.protocol.header;
+public class TopicConfigAndQueueMapping extends TopicConfig {
+    private TopicConfig topicConfig;
+    private TopicQueueMappingInfo topicQueueMappingInfo;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
+        this.topicConfig = topicConfig;
+        this.topicQueueMappingInfo = topicQueueMappingInfo;
     }
 
-    @CFNotNull
-    private String topic;
-
-    /**
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
+    public TopicQueueMappingInfo getTopicQueueMappingInfo() {
+        return topicQueueMappingInfo;
     }
 
-    /**
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public TopicConfig getTopicConfig() {
+        return topicConfig;
     }
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
similarity index 82%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
rename to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 0376965..0956a99 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -14,21 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class TopicQueueMappingInfo {
+public class TopicQueueMappingInfo extends RemotingSerializable {
 
+    private String topic; // redundant field
     private int totalQueues;
     private String bname;  //identify the host name
     //the newest mapping is in current broker
     private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
 
 
-    public TopicQueueMappingInfo(int totalQueues, String bname) {
+    public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
+        this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
     }
@@ -57,5 +61,7 @@ public class TopicQueueMappingInfo {
         return bname;
     }
 
-
+    public String getTopic() {
+        return topic;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 04f126b..f724695 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -209,4 +209,8 @@ public class RequestCode {
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
     public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
+
+
+    public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
index ea9d17c..4caba89 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
@@ -14,32 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+public class TopicQueueMappingBody extends TopicQueueMappingInfo {
 
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
+        super(topic, totalQueues, bname);
     }
-
-    @CFNotNull
-    private String topic;
-
-    /**
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
index ef3f758..1d3d6c5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.Map;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index ea9d17c..2b5d040 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -29,6 +29,8 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
+    private Boolean withMapping;
+
     /**
      * @return the topic
      */
@@ -42,4 +44,12 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
     public void setTopic(String topic) {
         this.topic = topic;
     }
+
+    public Boolean getWithMapping() {
+        return withMapping;
+    }
+
+    public void setWithMapping(Boolean withMapping) {
+        this.withMapping = withMapping;
+    }
 }
\ No newline at end of file