You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/07 13:07:46 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the test for topic queue mapping clean serice

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 1d7807b  Finish the test for topic queue mapping clean serice
1d7807b is described below

commit 1d7807bbb79137bf3160c7fd1134a4679bd11ab6
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Dec 7 21:07:25 2021 +0800

    Finish the test for topic queue mapping clean serice
---
 .../apache/rocketmq/broker/BrokerController.java   |   4 +
 .../topic/TopicQueueMappingCleanService.java       |  47 +++++---
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |   2 +-
 .../common/statictopic/TopicQueueMappingUtils.java |  13 ++-
 .../rocketmq/test/base/IntegrationTestBase.java    |   3 +-
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 118 +++++++++++++++++++++
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  |  69 +++---------
 7 files changed, 187 insertions(+), 69 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 4ef34f9..338f31e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1438,4 +1438,8 @@ public class BrokerController {
     public QueryAssignmentProcessor getQueryAssignmentProcessor() {
         return queryAssignmentProcessor;
     }
+
+    public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
+        return topicQueueMappingCleanService;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
index 91fd60d..05c5003 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -75,6 +76,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
         log.info("Start topic queue mapping clean service thread!");
         while (!this.isStopped()) {
             try {
+                this.waitForRunning(5L * 60 * 1000);
+            } catch (Throwable ignored) {
+            }
+            try {
                 cleanItemExpired();
             } catch (Throwable t) {
                 log.error("topic queue mapping cleanItemExpired failed", t);
@@ -84,11 +89,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
             } catch (Throwable t) {
                 log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
             }
-            try {
-                this.waitForRunning(5L * 60 * 1000);
-            } catch (Throwable ignore) {
 
-            }
         }
         log.info("End topic queue mapping clean service  thread!");
     }
@@ -119,7 +120,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                     }
                     Set<String> brokers = new HashSet<>();
                     for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
-                        if (items.size() < 2) {
+                        if (items.size() <= 1) {
+                            continue;
+                        }
+                        if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                             continue;
                         }
                         LogicQueueMappingItem earlistItem = items.get(0);
@@ -138,11 +142,18 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                             }
                             statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
                         } catch (Throwable rt) {
-                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                            log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                         }
                     }
-                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
-                        if (items.size() < 2) {
+                    Map<Integer, List<LogicQueueMappingItem>> newHostedQueues = new HashMap<>();
+                    boolean changedForTopic = false;
+                    for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
+                        Integer qid = entry.getKey();
+                        List<LogicQueueMappingItem> items = entry.getValue();
+                        if (items.size() <= 1) {
+                            continue;
+                        }
+                        if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                             continue;
                         }
                         LogicQueueMappingItem earlistItem = items.get(0);
@@ -153,7 +164,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                         TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
                         if (topicOffset == null) {
                             //this may should not happen
-                            log.warn("Get null topicOffset for {}", earlistItem);
+                            log.error("Get null topicOffset for {} {}",topic,  earlistItem);
                             continue;
                         }
                         //ignore the maxOffset < 0, which may in case of some error
@@ -161,11 +172,20 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                             || topicOffset.getMaxOffset() == 0) {
                             List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
                             boolean result = newItems.remove(earlistItem);
-                            this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false);
-                            changed = changed || result;
-                            log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                            if (result) {
+                                changedForTopic = true;
+                                newHostedQueues.put(qid, newItems);
+                            }
+                            log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset);
                         }
                     }
+                    if (changedForTopic) {
+                        TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch());
+                        newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues());
+                        newMappingDetail.getHostedQueues().putAll(newHostedQueues);
+                        this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false);
+                        changed = true;
+                    }
                 } catch (Throwable tt) {
                     log.error("Try CleanItemExpired failed for {}", topic, tt);
                 } finally {
@@ -241,6 +261,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                         GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
                         header.setTopic(topic);
                         header.setBname(broker);
+                        header.setWithMapping(true);
                         try {
                             RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
                             RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
@@ -252,7 +273,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                                 mappingDetailMap.put(broker, mappingDetailRemote);
                             }
                         } catch (Throwable rt) {
-                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                            log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                         }
                     }
                     //check all the info
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 6d75df9..eaa22be 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -229,7 +229,7 @@ public class RpcClientImpl implements RpcClient {
         assert responseCommand != null;
         switch (responseCommand.getCode()) {
             case ResponseCode.SUCCESS: {
-                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
+                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass)));
                 break;
             }
             default:{
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 1b52904..5c564a3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -197,8 +197,8 @@ public class TopicQueueMappingUtils {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
-        if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) {
-            throw new RuntimeException("The new item list is smaller than old ones");
+        if (newItems == null || newItems.isEmpty()) {
+            throw new RuntimeException("The new item list is null or empty");
         }
         int iold = 0, inew = 0;
         while (iold < oldItems.size() && inew < newItems.size()) {
@@ -665,4 +665,13 @@ public class TopicQueueMappingUtils {
         return null;
     }
 
+
+    public static boolean checkIfLeader(List<LogicQueueMappingItem> items, TopicQueueMappingDetail mappingDetail) {
+        if (items == null
+            || mappingDetail == null
+            || items.isEmpty()) {
+            return false;
+        }
+        return items.get(items.size() - 1).getBname().equals(mappingDetail.getBname());
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 06f079c..56ee320 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -129,7 +129,7 @@ public class IntegrationTestBase {
         String baseDir = createBaseDir();
         BrokerConfig brokerConfig = new BrokerConfig();
         MessageStoreConfig storeConfig = new MessageStoreConfig();
-        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet());
         brokerConfig.setBrokerIP1("127.0.0.1");
         brokerConfig.setNamesrvAddr(nsAddr);
         brokerConfig.setEnablePropertyFilter(true);
@@ -139,6 +139,7 @@ public class IntegrationTestBase {
         storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
         storeConfig.setMaxIndexNum(INDEX_NUM);
         storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+        storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
         return createAndStartBroker(storeConfig, brokerConfig);
 
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 14bb967..fdefb06 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -1,10 +1,13 @@
 package org.apache.rocketmq.test.statictopic;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
@@ -357,6 +360,121 @@ public class StaticTopicIT extends BaseConf {
     }
 
 
+    public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
+        ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
+        List<MessageQueue> messageQueueList = producer.getMessageQueue();
+        Assert.assertEquals(queueNum, messageQueueList.size());
+        for (int i = 0; i < queueNum; i++) {
+            MessageQueue messageQueue = messageQueueList.get(i);
+            Assert.assertEquals(i, messageQueue.getQueueId());
+            Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+            Assert.assertEquals(destBrokerName, broker);
+        }
+
+        for(MessageQueue messageQueue: messageQueueList) {
+            producer.send(msgEachQueue, messageQueue);
+        }
+        Assert.assertEquals(0, producer.getSendErrorMsg().size());
+        //leave the time to build the cq
+        Thread.sleep(100);
+        for(MessageQueue messageQueue: messageQueueList) {
+            Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+            Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
+        }
+    }
+
+
+    @Test
+    public void testRemappingAndClear() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create to broker1Name
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker1Name);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            //leave the time to refresh the metadata
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
+        }
+
+        //remapping to broker2Name
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker2Name);
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            //leave the time to refresh the metadata
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
+        }
+
+        //remapping to broker3Name
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker3Name);
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            //leave the time to refresh the metadata
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
+        }
+
+        // 1 -> 2 -> 3, currently 1 should not has any mappings
+
+        {
+            for (int i = 0; i < 10; i++) {
+                for (BrokerController brokerController: brokerControllerList) {
+                    brokerController.getTopicQueueMappingCleanService().wakeup();
+                }
+                Thread.sleep(100);
+            }
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(brokerNum, brokerConfigMap.size());
+            TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
+            TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
+            TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
+            Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
+            Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
+
+            Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
+
+        }
+        {
+            Set<String> topics =  new HashSet<>(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet());
+            topics.remove(topic);
+            brokerController1.getMessageStore().cleanUnusedTopic(topics);
+            brokerController2.getMessageStore().cleanUnusedTopic(topics);
+            for (int i = 0; i < 10; i++) {
+                for (BrokerController brokerController: brokerControllerList) {
+                    brokerController.getTopicQueueMappingCleanService().wakeup();
+                }
+                Thread.sleep(100);
+            }
+
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(brokerNum, brokerConfigMap.size());
+            TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
+            TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
+            TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
+            Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
+            Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
+            Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
+            //The first leader will clear it
+            for (List<LogicQueueMappingItem> items : config1.getMappingDetail().getHostedQueues().values()) {
+                Assert.assertEquals(3, items.size());
+            }
+            //The second leader do nothing
+            for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
+                Assert.assertEquals(1, items.size());
+            }
+
+        }
+
+
+    }
+
 
     @Test
     public void testRemappingWithNegativeLogicOffset() throws Exception {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index eed6b76..0fda471 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -190,61 +190,26 @@ public class MQAdminUtils {
     public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException,  InterruptedException, MQBrokerException {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
         ClientMetadata clientMetadata = new ClientMetadata();
-        boolean getFromBrokers = false;
-        TopicRouteData routeData = null;
-        try {
-            routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-        } catch (MQClientException  exception) {
-            if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
-            } else {
-                getFromBrokers = true;
-            }
+        //check all the brokers
+        ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+        if (clusterInfo != null
+                && clusterInfo.getBrokerAddrTable() != null) {
+            clientMetadata.refreshClusterInfo(clusterInfo);
         }
-        if (!getFromBrokers) {
-            if (routeData != null
-                    && !routeData.getQueueDatas().isEmpty()) {
-                clientMetadata.freshTopicRoute(topic, routeData);
-                for (QueueData queueData: routeData.getQueueDatas()) {
-                    String bname = queueData.getBrokerName();
-                    String addr = clientMetadata.findMasterBrokerAddr(bname);
-                    try {
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    } catch (MQBrokerException exception) {
-                        if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                            throw exception;
-                        }
+        for (String broker : clientMetadata.getBrokerAddrTable().keySet()) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            try {
+                TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                //allow the config is null
+                if (mapping != null) {
+                    if (mapping.getMappingDetail() != null) {
+                        assert mapping.getMappingDetail().getBname().equals(broker);
                     }
-
+                    brokerConfigMap.put(broker, mapping);
                 }
-            }
-        } else {
-            //if cannot get from nameserver, then check all the brokers
-            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
-            if (clusterInfo != null
-                    && clusterInfo.getBrokerAddrTable() != null) {
-                clientMetadata.refreshClusterInfo(clusterInfo);
-            }
-            for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
-                String bname = entry.getKey();
-                HashMap<Long, String> map = entry.getValue();
-                String addr = map.get(MixAll.MASTER_ID);
-                if (addr != null) {
-                    try {
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    }  catch (MQBrokerException exception1) {
-                        if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                            throw exception1;
-                        }
-                    }
+            }  catch (MQBrokerException exception1) {
+                if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                    throw exception1;
                 }
             }
         }