You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/19 12:33:40 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the check logic for admin process update-and-create static topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new d18d787  Add the check logic for admin process update-and-create static topic
d18d787 is described below

commit d18d787f92eb44020a482205e27bae4b9c725957
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Nov 19 20:33:22 2021 +0800

    Add the check logic for admin process update-and-create static topic
---
 .../broker/processor/AdminBrokerProcessor.java     | 19 +++++++---
 .../broker/topic/TopicQueueMappingManager.java     | 44 +++++++++++++++++++++-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  4 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  4 +-
 .../protocol/header/CreateTopicRequestHeader.java  | 12 ++++++
 .../common/statictopic/TopicQueueMappingUtils.java |  2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  4 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  2 +-
 .../topic/RemappingStaticTopicSubCommand.java      | 12 +++---
 .../command/topic/UpdateStaticTopicSubCommand.java |  2 +-
 11 files changed, 85 insertions(+), 24 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2f0aafb..bab484d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (TopicValidator.isSystemTopic(topic, response)) {
             return response;
         }
+        boolean force = false;
+        if (requestHeader.getForce() != null && requestHeader.getForce()) {
+            force = true;
+        }
 
         TopicConfig topicConfig = new TopicConfig(topic);
         topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         topicConfig.setPerm(requestHeader.getPerm());
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
 
-        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
-        this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
+        try {
+            this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+            this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
 
-        response.setCode(ResponseCode.SUCCESS);
+            this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+            response.setCode(ResponseCode.SUCCESS);
+        } catch (Exception e) {
+            log.error("Update static failed for [{}]", request, e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
         return response;
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 13f0857..e2b46fe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
         this.brokerController = brokerController;
     }
 
-    public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
-        topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
+    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
+        lock.lock();
+        try {
+            if (newDetail == null) {
+                return;
+            }
+            newDetail.getHostedQueues().forEach((queueId, items) -> {
+                TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
+            });
+
+            TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
+            if (oldDetail == null) {
+                topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                return;
+            }
+            if (force) {
+                oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+                    newDetail.getHostedQueues().putIfAbsent(queueId, items);
+                });
+                topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                return;
+            }
+            //do more check
+            if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
+                throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
+            }
+            for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
+                ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
+                ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
+                if (newItems == null) {
+                    //keep the old
+                    newDetail.getHostedQueues().put(globalId, oldItems);
+                } else {
+                    TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems);
+                }
+            }
+            topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+        } finally {
+            lock.unlock();
+        }
+
     }
 
     public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 4df7b9e..7d539cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -80,11 +80,11 @@ public class MQAdminImpl {
         this.timeoutMillis = timeoutMillis;
     }
 
-    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
+    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
         MQClientException exception = null;
         for (int i = 0; i < 3; i++) {
             try {
-                this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis);
+                this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail,  force, timeoutMillis);
                 break;
             } catch (Exception e) {
                 if (2 == i) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d93f690..050fc30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail,
+    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
                             final long timeoutMillis)
             throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
         requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
         requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
         requestHeader.setOrder(topicConfig.isOrder());
-
+        requestHeader.setForce(force);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader);
         request.setBody(topicQueueMappingDetail.encode());
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 8894d0b..290ec4c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class CreateTopicRequestHeader implements CommandCustomHeader {
@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Boolean order = false;
 
+    @CFNullable
+    private Boolean force = false;
+
     @Override
     public void checkFields() throws RemotingCommandException {
         try {
@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
     public void setOrder(Boolean order) {
         this.order = order;
     }
+
+    public Boolean getForce() {
+        return force;
+    }
+
+    public void setForce(Boolean force) {
+        this.force = force;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 370f0ec..eb82cad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,7 +162,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
+    public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6b0f5a6..7030056 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
+    public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
     @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 811ee26..06b456f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
-        this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
+    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
+        this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 5339709..888dad8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 
 
-    void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException,
+    void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
             InterruptedException, MQClientException;
 
     void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 3b1b27a..c46c28a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-            doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
+            doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
 
 
     public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
-                            ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+                            ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
         // now do the remapping
         //Step1: let the new leader can be write without the logicOffset
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
         //Step2: forbid the write of old leader
         for (String broker: brokersToMapOut) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
         //Step3: decide the logic offset
         for (String broker: brokersToMapOut) {
@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
         }
     }
 
@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 System.out.println("The old mapping data is written to file " + newMappingDataFile);
             }
 
-            doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
+            doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 6184290..7c6abdc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             String broker = entry.getKey();
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = entry.getValue();
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
         }
     }