You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 09:47:51 UTC

[rocketmq] 01/02: Clean the items more than second gen

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 527382e4ce5cc03435b199f4b3c618be5590210c
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 16:44:50 2021 +0800

    Clean the items more than second gen
---
 .../broker/processor/AdminBrokerProcessor.java     |  5 ++-
 .../broker/topic/TopicQueueMappingManager.java     | 50 +++++++++++++++++++++-
 .../statictopic/TopicQueueMappingDetail.java       | 13 ++++++
 .../common/statictopic/TopicQueueMappingInfo.java  | 12 ++++++
 .../namesrv/routeinfo/RouteInfoManager.java        |  1 +
 5 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a286818..20dfc85 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         }
 
         this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+        this.brokerController.getTopicQueueMappingManager().delete(topic);
+
         this.brokerController.getMessageStore()
             .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
             this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
         }
-        //TODO delete the topic route
-        //this.brokerController.getTopicQueueMappingManager()
+
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 9be3717..c484bcf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager {
     public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
         boolean locked = false;
         boolean updated = false;
+        TopicQueueMappingDetail oldDetail = null;
         try {
             if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 locked = true;
@@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                 TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
             });
 
-            TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
+            oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
             if (oldDetail == null) {
                 topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
                 updated = true;
@@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager {
             }
             if (updated) {
                 this.persist();
+                log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
             }
         }
 
     }
 
+    public void delete(final String topic) {
+        TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic);
+        if (old != null) {
+            log.info("delete topic queue mapping OK, topic queue mapping: {}", old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            log.warn("delete topic queue mapping failed, topic: {} not exists", topic);
+        }
+    }
+
     public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
         return topicQueueMappingTable.get(topic);
     }
@@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager {
             //it is not static topic
             return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
         }
+
+        assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
+
         //If not find mappingItem, it encounters some errors
         Integer globalId = requestHeader.getQueueId();
         if (globalId < 0 && !selectOneWhenMiss) {
@@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager {
     }
 
 
+    public void cleanItemListMoreThanSecondGen() {
+        for(String topic : topicQueueMappingTable.keySet()) {
+            TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+            if (mappingDetail == null
+                    || mappingDetail.getHostedQueues().isEmpty()) {
+                continue;
+            }
+            if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                continue;
+            }
+            Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+                Integer queueId = entry.getKey();
+                List<LogicQueueMappingItem> items = entry.getValue();
+                if (items.size() <= 2) {
+                    continue;
+                }
+                LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
+                if (!leaderItem.getBname().equals(mappingDetail.getBname())
+                        && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
+                    it.remove();
+                    log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
+                }
+            }
+        }
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 1749b8e..f0f27a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
                 .append(hostedQueues)
                 .toHashCode();
     }
+
+    @Override
+    public String toString() {
+        return "TopicQueueMappingDetail{" +
+                "hostedQueues=" + hostedQueues +
+                ", topic='" + topic + '\'' +
+                ", totalQueues=" + totalQueues +
+                ", bname='" + bname + '\'' +
+                ", epoch=" + epoch +
+                ", dirty=" + dirty +
+                ", currIdMap=" + currIdMap +
+                '}';
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 53041aa..a6a7eb5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
                 .append(currIdMap)
                 .toHashCode();
     }
+
+    @Override
+    public String toString() {
+        return "TopicQueueMappingInfo{" +
+                "topic='" + topic + '\'' +
+                ", totalQueues=" + totalQueues +
+                ", bname='" + bname + '\'' +
+                ", epoch=" + epoch +
+                ", dirty=" + dirty +
+                ", currIdMap=" + currIdMap +
+                '}';
+    }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 60a0f81..a02d3f1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -169,6 +169,7 @@ public class RouteInfoManager {
                                 topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
                             }
                             //Note asset brokerName equal entry.getValue().getBname()
+                            //here use the mappingDetail.bname
                             topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                         }
                     }