You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 09:47:51 UTC
[rocketmq] 01/02: Clean the items more than second gen
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 527382e4ce5cc03435b199f4b3c618be5590210c
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 16:44:50 2021 +0800
Clean the items more than second gen
---
.../broker/processor/AdminBrokerProcessor.java | 5 ++-
.../broker/topic/TopicQueueMappingManager.java | 50 +++++++++++++++++++++-
.../statictopic/TopicQueueMappingDetail.java | 13 ++++++
.../common/statictopic/TopicQueueMappingInfo.java | 12 ++++++
.../namesrv/routeinfo/RouteInfoManager.java | 1 +
5 files changed, 78 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a286818..20dfc85 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
}
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+ this.brokerController.getTopicQueueMappingManager().delete(topic);
+
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
}
- //TODO delete the topic route
- //this.brokerController.getTopicQueueMappingManager()
+
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9be3717..c484bcf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager {
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
boolean locked = false;
boolean updated = false;
+ TopicQueueMappingDetail oldDetail = null;
try {
if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
locked = true;
@@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
});
- TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
+ oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
@@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager {
}
if (updated) {
this.persist();
+ log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
}
}
}
+ public void delete(final String topic) {
+ TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic);
+ if (old != null) {
+ log.info("delete topic queue mapping OK, topic queue mapping: {}", old);
+ this.dataVersion.nextVersion();
+ this.persist();
+ } else {
+ log.warn("delete topic queue mapping failed, topic: {} not exists", topic);
+ }
+ }
+
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
@@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager {
//it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
}
+
+ assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
+
//If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) {
@@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager {
}
+ public void cleanItemListMoreThanSecondGen() {
+ for(String topic : topicQueueMappingTable.keySet()) {
+ TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+ if (mappingDetail == null
+ || mappingDetail.getHostedQueues().isEmpty()) {
+ continue;
+ }
+ if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+ log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+ continue;
+ }
+ Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+ Integer queueId = entry.getKey();
+ List<LogicQueueMappingItem> items = entry.getValue();
+ if (items.size() <= 2) {
+ continue;
+ }
+ LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+ LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
+ if (!leaderItem.getBname().equals(mappingDetail.getBname())
+ && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
+ it.remove();
+ log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
+ }
+ }
+ }
+ }
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 1749b8e..f0f27a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
.append(hostedQueues)
.toHashCode();
}
+
+ @Override
+ public String toString() {
+ return "TopicQueueMappingDetail{" +
+ "hostedQueues=" + hostedQueues +
+ ", topic='" + topic + '\'' +
+ ", totalQueues=" + totalQueues +
+ ", bname='" + bname + '\'' +
+ ", epoch=" + epoch +
+ ", dirty=" + dirty +
+ ", currIdMap=" + currIdMap +
+ '}';
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 53041aa..a6a7eb5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
.append(currIdMap)
.toHashCode();
}
+
+ @Override
+ public String toString() {
+ return "TopicQueueMappingInfo{" +
+ "topic='" + topic + '\'' +
+ ", totalQueues=" + totalQueues +
+ ", bname='" + bname + '\'' +
+ ", epoch=" + epoch +
+ ", dirty=" + dirty +
+ ", currIdMap=" + currIdMap +
+ '}';
+ }
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 60a0f81..a02d3f1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -169,6 +169,7 @@ public class RouteInfoManager {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
}
//Note asset brokerName equal entry.getValue().getBname()
+ //here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}