You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/19 12:33:40 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the check logic for admin process update-and-create static topic
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new d18d787 Add the check logic for admin process update-and-create static topic
d18d787 is described below
commit d18d787f92eb44020a482205e27bae4b9c725957
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Nov 19 20:33:22 2021 +0800
Add the check logic for admin process update-and-create static topic
---
.../broker/processor/AdminBrokerProcessor.java | 19 +++++++---
.../broker/topic/TopicQueueMappingManager.java | 44 +++++++++++++++++++++-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 4 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 4 +-
.../protocol/header/CreateTopicRequestHeader.java | 12 ++++++
.../common/statictopic/TopicQueueMappingUtils.java | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 4 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 4 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 2 +-
.../topic/RemappingStaticTopicSubCommand.java | 12 +++---
.../command/topic/UpdateStaticTopicSubCommand.java | 2 +-
11 files changed, 85 insertions(+), 24 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2f0aafb..bab484d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
+ boolean force = false;
+ if (requestHeader.getForce() != null && requestHeader.getForce()) {
+ force = true;
+ }
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
- this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
- this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
+ try {
+ this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+ this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
- response.setCode(ResponseCode.SUCCESS);
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+ response.setCode(ResponseCode.SUCCESS);
+ } catch (Exception e) {
+ log.error("Update static failed for [{}]", request, e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
return response;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 13f0857..e2b46fe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController;
}
- public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
- topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
+ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
+ lock.lock();
+ try {
+ if (newDetail == null) {
+ return;
+ }
+ newDetail.getHostedQueues().forEach((queueId, items) -> {
+ TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
+ });
+
+ TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
+ if (oldDetail == null) {
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ return;
+ }
+ if (force) {
+ oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+ newDetail.getHostedQueues().putIfAbsent(queueId, items);
+ });
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ return;
+ }
+ //do more check
+ if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
+ throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
+ }
+ for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
+ ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
+ ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
+ if (newItems == null) {
+ //keep the old
+ newDetail.getHostedQueues().put(globalId, oldItems);
+ } else {
+ TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems);
+ }
+ }
+ topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ } finally {
+ lock.unlock();
+ }
+
}
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 4df7b9e..7d539cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -80,11 +80,11 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
- public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
+ public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
MQClientException exception = null;
for (int i = 0; i < 3; i++) {
try {
- this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis);
+ this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
break;
} catch (Exception e) {
if (2 == i) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d93f690..050fc30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail,
+ public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
-
+ requestHeader.setForce(force);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader);
request.setBody(topicQueueMappingDetail.encode());
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 8894d0b..290ec4c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateTopicRequestHeader implements CommandCustomHeader {
@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
@CFNotNull
private Boolean order = false;
+ @CFNullable
+ private Boolean force = false;
+
@Override
public void checkFields() throws RemotingCommandException {
try {
@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
public void setOrder(Boolean order) {
this.order = order;
}
+
+ public Boolean getForce() {
+ return force;
+ }
+
+ public void setForce(Boolean force) {
+ this.force = force;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 370f0ec..eb82cad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,7 +162,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
- public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
+ public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6b0f5a6..7030056 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
+ public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 811ee26..06b456f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
- this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
+ public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
+ this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 5339709..888dad8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
- void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException,
+ void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 3b1b27a..c46c28a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
- doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
+ doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
- ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+ ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
- doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
+ doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 6184290..7c6abdc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+ defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}