You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/17 08:07:34 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Add epoch, dirty to the topic mapping detail
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 0c6ee5c Add epoch, dirty to the topic mapping detail
0c6ee5c is described below
commit 0c6ee5c50b2b6077ecbe06289e4b7d1a1ab50cca
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 17 16:07:05 2021 +0800
Add epoch, dirty to the topic mapping detail
---
.../rocketmq/common/TopicQueueMappingDetail.java | 6 +++---
.../apache/rocketmq/common/TopicQueueMappingInfo.java | 19 ++++++++++++++-----
.../apache/rocketmq/common/rpc/ClientMetadata.java | 2 ++
.../command/topic/UpdateStaticTopicSubCommand.java | 11 +++++++++++
4 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index c5d6ebb..9b67751 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
- public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) {
- super(topic, totalQueues, bname, gen);
+ public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) {
+ super(topic, totalQueues, bname, epoch);
buildIdMap();
}
@@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public TopicQueueMappingInfo cloneAsMappingInfo() {
- TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen);
+ TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index 7f4a201..b2e8591 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
String topic; // redundant field
int totalQueues;
String bname; //identify the hosted broker name
- int gen; //important to fence the old dirty data
+ int epoch; //important to fence the old dirty data
+ boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
- public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) {
+ public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
- this.gen = gen;
+ this.epoch = epoch;
+ this.dirty = false;
}
+ public boolean isDirty() {
+ return dirty;
+ }
+
+ public void setDirty(boolean dirty) {
+ this.dirty = dirty;
+ }
public int getTotalQueues() {
return totalQueues;
@@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return topic;
}
- public int getGen() {
- return gen;
+ public int getEpoch() {
+ return epoch;
}
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 23fbc6f..e2dd076 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -57,9 +57,11 @@ public class ClientMetadata {
return new ConcurrentHashMap<MessageQueue, String>();
}
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
+
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey();
+ //TODO check the epoch of
if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 1ca9fd5..dc23780 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
+
+ String topic = commandLine.getOptionValue('t').trim();
+ int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+ String cluster = commandLine.getOptionValue('c').trim();
+
+ //first check the topic route
+ {
+ TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+
+ }
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);