You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/06 09:41:28 UTC
[rocketmq] 02/02: Add update_static_topic code
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3e2c9202392c9a13259662023bbcb997c9770f7f
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 6 17:39:57 2021 +0800
Add update_static_topic code
---
.../apache/rocketmq/broker/BrokerController.java | 9 +++
.../broker/processor/AdminBrokerProcessor.java | 75 +++++++++++++---------
.../broker/topic/TopicQueueMappingManager.java | 9 ++-
.../route => }/LogicQueueMappingItem.java | 2 +-
...Header.java => TopicConfigAndQueueMapping.java} | 34 ++++------
.../route => }/TopicQueueMappingInfo.java | 14 ++--
.../rocketmq/common/protocol/RequestCode.java | 4 ++
.../TopicQueueMappingBody.java} | 31 ++-------
.../body/TopicQueueMappingSerializeWrapper.java | 2 +-
.../header/GetTopicConfigRequestHeader.java | 10 +++
10 files changed, 106 insertions(+), 84 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 5eb9169..27cba02 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -179,6 +180,7 @@ public class BrokerController {
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
+ private TopicQueueMappingManager topicQueueMappingManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService ackMessageExecutor;
@@ -215,6 +217,7 @@ public class BrokerController {
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
+ this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.popMessageProcessor = new PopMessageProcessor(this);
@@ -287,6 +290,8 @@ public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
+ result = result && this.topicQueueMappingManager.load();
+
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
@@ -1184,6 +1189,10 @@ public class BrokerController {
this.topicConfigManager = topicConfigManager;
}
+ public TopicQueueMappingManager getTopicQueueMappingManager() {
+ return topicQueueMappingManager;
+ }
+
public String getHAServerAddr() {
return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e7b7949..99c7031 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -54,13 +54,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
-import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.TopicQueueId;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.*;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
@@ -74,28 +68,7 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
-import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
-import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
-import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
-import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
-import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
-import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
-import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
-import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
-import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody;
-import org.apache.rocketmq.common.protocol.body.TopicList;
-import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody;
+import org.apache.rocketmq.common.protocol.body.*;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
@@ -282,6 +255,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return migrateTopicLogicalQueueCommit(ctx, request);
case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
return migrateTopicLogicalQueueNotify(ctx, request);
+ case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
+ return this.updateAndCreateStaticTopic(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
@@ -323,6 +298,42 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
+ private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final CreateTopicRequestHeader requestHeader =
+ (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+ log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
+
+ String topic = requestHeader.getTopic();
+
+ if (!TopicValidator.validateTopic(topic, response)) {
+ return response;
+ }
+ if (TopicValidator.isSystemTopic(topic, response)) {
+ return response;
+ }
+
+ TopicConfig topicConfig = new TopicConfig(topic);
+ topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
+ topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
+ topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
+ topicConfig.setPerm(requestHeader.getPerm());
+ topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+
+ this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+
+ this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody);
+
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+
+ response.setCode(ResponseCode.SUCCESS);
+ return response;
+ }
+
+
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1715,7 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
- String content = JSONObject.toJSONString(topicConfig);
+ TopicQueueMappingInfo topicQueueMappingInfo = null;
+ if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
+ topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+ }
+ String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9ee0f51..c885b31 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -45,7 +45,14 @@ public class TopicQueueMappingManager extends ConfigManager {
public TopicQueueMappingManager(BrokerController brokerController) {
this.brokerController = brokerController;
+ }
+
+ public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
+ topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
+ }
+ public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
+ return topicQueueMappingTable.get(topic);
}
@Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
similarity index 95%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
rename to common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index fc5cbe6..50d88ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
public class LogicQueueMappingItem {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index ea9d17c..f9a6ab4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -14,32 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.common;
-package org.apache.rocketmq.common.protocol.header;
+public class TopicConfigAndQueueMapping extends TopicConfig {
+ private TopicConfig topicConfig;
+ private TopicQueueMappingInfo topicQueueMappingInfo;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
- @Override
- public void checkFields() throws RemotingCommandException {
+ public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
+ this.topicConfig = topicConfig;
+ this.topicQueueMappingInfo = topicQueueMappingInfo;
}
- @CFNotNull
- private String topic;
-
- /**
- * @return the topic
- */
- public String getTopic() {
- return topic;
+ public TopicQueueMappingInfo getTopicQueueMappingInfo() {
+ return topicQueueMappingInfo;
}
- /**
- * @param topic the topic to set
- */
- public void setTopic(String topic) {
- this.topic = topic;
+ public TopicConfig getTopicConfig() {
+ return topicConfig;
}
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
similarity index 82%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
rename to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 0376965..0956a99 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -14,21 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.common.protocol.route;
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class TopicQueueMappingInfo {
+public class TopicQueueMappingInfo extends RemotingSerializable {
+ private String topic; // redundant field
private int totalQueues;
private String bname; //identify the host name
//the newest mapping is in current broker
private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
- public TopicQueueMappingInfo(int totalQueues, String bname) {
+ public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
+ this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
}
@@ -57,5 +61,7 @@ public class TopicQueueMappingInfo {
return bname;
}
-
+ public String getTopic() {
+ return topic;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 04f126b..f724695 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -209,4 +209,8 @@ public class RequestCode {
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
+
+
+ public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513;
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
index ea9d17c..4caba89 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java
@@ -14,32 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.common.protocol.body;
-package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+public class TopicQueueMappingBody extends TopicQueueMappingInfo {
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
- @Override
- public void checkFields() throws RemotingCommandException {
+ public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
+ super(topic, totalQueues, bname);
}
-
- @CFNotNull
- private String topic;
-
- /**
- * @return the topic
- */
- public String getTopic() {
- return topic;
- }
-
- /**
- * @param topic the topic to set
- */
- public void setTopic(String topic) {
- this.topic = topic;
- }
-}
\ No newline at end of file
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
index ef3f758..1d3d6c5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index ea9d17c..2b5d040 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -29,6 +29,8 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
+ private Boolean withMapping;
+
/**
* @return the topic
*/
@@ -42,4 +44,12 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
public void setTopic(String topic) {
this.topic = topic;
}
+
+ public Boolean getWithMapping() {
+ return withMapping;
+ }
+
+ public void setWithMapping(Boolean withMapping) {
+ this.withMapping = withMapping;
+ }
}
\ No newline at end of file