You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/17 08:07:34 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Add epoch, dirty to the topic mapping detail

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 0c6ee5c  Add epoch, dirty to the topic mapping detail
0c6ee5c is described below

commit 0c6ee5c50b2b6077ecbe06289e4b7d1a1ab50cca
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 17 16:07:05 2021 +0800

    Add epoch, dirty to the topic mapping detail
---
 .../rocketmq/common/TopicQueueMappingDetail.java      |  6 +++---
 .../apache/rocketmq/common/TopicQueueMappingInfo.java | 19 ++++++++++++++-----
 .../apache/rocketmq/common/rpc/ClientMetadata.java    |  2 ++
 .../command/topic/UpdateStaticTopicSubCommand.java    | 11 +++++++++++
 4 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index c5d6ebb..9b67751 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     // make sure this value is not null
     private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
 
-    public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) {
-        super(topic, totalQueues, bname, gen);
+    public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) {
+        super(topic, totalQueues, bname, epoch);
         buildIdMap();
     }
 
@@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
 
     public TopicQueueMappingInfo cloneAsMappingInfo() {
-        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen);
+        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
         topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
         topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 7f4a201..b2e8591 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     String topic; // redundant field
     int totalQueues;
     String bname;  //identify the hosted broker name
-    int gen; //important to fence the old dirty data
+    int epoch; //important to fence the old dirty data
+    boolean dirty; //indicate if the data is dirty
     //register to broker to construct the route
     transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
     //register to broker to help detect remapping failure
     transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
-    public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) {
+    public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) {
         this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
-        this.gen = gen;
+        this.epoch = epoch;
+        this.dirty = false;
     }
 
+    public boolean isDirty() {
+        return dirty;
+    }
+
+    public void setDirty(boolean dirty) {
+        this.dirty = dirty;
+    }
 
     public int getTotalQueues() {
         return totalQueues;
@@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
         return topic;
     }
 
-    public int getGen() {
-        return gen;
+    public int getEpoch() {
+        return epoch;
     }
 
     public ConcurrentMap<Integer, Integer> getCurrIdMap() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 23fbc6f..e2dd076 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -57,9 +57,11 @@ public class ClientMetadata {
             return new ConcurrentHashMap<MessageQueue, String>();
         }
         ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
+
         int totalNums = 0;
         for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
             String brokerName = entry.getKey();
+            //TODO check the epoch of
             if (entry.getValue().getTotalQueues() > totalNums) {
                 if (totalNums != 0) {
                     log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 1ca9fd5..dc23780 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
@@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         try {
+
+            String topic = commandLine.getOptionValue('t').trim();
+            int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+            String cluster = commandLine.getOptionValue('c').trim();
+
+            //first check the topic route
+            {
+                TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+
+            }
             TopicConfig topicConfig = new TopicConfig();
             topicConfig.setReadQueueNums(8);
             topicConfig.setWriteQueueNums(8);