You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/07 13:07:46 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Finish the test for topic queue mapping clean serice
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 1d7807b Finish the test for topic queue mapping clean serice
1d7807b is described below
commit 1d7807bbb79137bf3160c7fd1134a4679bd11ab6
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Dec 7 21:07:25 2021 +0800
Finish the test for topic queue mapping clean serice
---
.../apache/rocketmq/broker/BrokerController.java | 4 +
.../topic/TopicQueueMappingCleanService.java | 47 +++++---
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 2 +-
.../common/statictopic/TopicQueueMappingUtils.java | 13 ++-
.../rocketmq/test/base/IntegrationTestBase.java | 3 +-
.../rocketmq/test/statictopic/StaticTopicIT.java | 118 +++++++++++++++++++++
.../apache/rocketmq/tools/admin/MQAdminUtils.java | 69 +++---------
7 files changed, 187 insertions(+), 69 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 4ef34f9..338f31e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1438,4 +1438,8 @@ public class BrokerController {
public QueryAssignmentProcessor getQueryAssignmentProcessor() {
return queryAssignmentProcessor;
}
+
+ public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
+ return topicQueueMappingCleanService;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
index 91fd60d..05c5003 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -75,6 +76,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
log.info("Start topic queue mapping clean service thread!");
while (!this.isStopped()) {
try {
+ this.waitForRunning(5L * 60 * 1000);
+ } catch (Throwable ignored) {
+ }
+ try {
cleanItemExpired();
} catch (Throwable t) {
log.error("topic queue mapping cleanItemExpired failed", t);
@@ -84,11 +89,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
} catch (Throwable t) {
log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
}
- try {
- this.waitForRunning(5L * 60 * 1000);
- } catch (Throwable ignore) {
- }
}
log.info("End topic queue mapping clean service thread!");
}
@@ -119,7 +120,10 @@ public class TopicQueueMappingCleanService extends ServiceThread {
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
- if (items.size() < 2) {
+ if (items.size() <= 1) {
+ continue;
+ }
+ if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
@@ -138,11 +142,18 @@ public class TopicQueueMappingCleanService extends ServiceThread {
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
- log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
- for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
- if (items.size() < 2) {
+ Map<Integer, List<LogicQueueMappingItem>> newHostedQueues = new HashMap<>();
+ boolean changedForTopic = false;
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
+ Integer qid = entry.getKey();
+ List<LogicQueueMappingItem> items = entry.getValue();
+ if (items.size() <= 1) {
+ continue;
+ }
+ if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
@@ -153,7 +164,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
- log.warn("Get null topicOffset for {}", earlistItem);
+ log.error("Get null topicOffset for {} {}",topic, earlistItem);
continue;
}
//ignore the maxOffset < 0, which may in case of some error
@@ -161,11 +172,20 @@ public class TopicQueueMappingCleanService extends ServiceThread {
|| topicOffset.getMaxOffset() == 0) {
List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
boolean result = newItems.remove(earlistItem);
- this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false);
- changed = changed || result;
- log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+ if (result) {
+ changedForTopic = true;
+ newHostedQueues.put(qid, newItems);
+ }
+ log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset);
}
}
+ if (changedForTopic) {
+ TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch());
+ newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues());
+ newMappingDetail.getHostedQueues().putAll(newHostedQueues);
+ this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false);
+ changed = true;
+ }
} catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally {
@@ -241,6 +261,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
+ header.setWithMapping(true);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
@@ -252,7 +273,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
mappingDetailMap.put(broker, mappingDetailRemote);
}
} catch (Throwable rt) {
- log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
//check all the info
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 6d75df9..eaa22be 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -229,7 +229,7 @@ public class RpcClientImpl implements RpcClient {
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
- rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
+ rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass)));
break;
}
default:{
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 1b52904..5c564a3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -197,8 +197,8 @@ public class TopicQueueMappingUtils {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
- if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) {
- throw new RuntimeException("The new item list is smaller than old ones");
+ if (newItems == null || newItems.isEmpty()) {
+ throw new RuntimeException("The new item list is null or empty");
}
int iold = 0, inew = 0;
while (iold < oldItems.size() && inew < newItems.size()) {
@@ -665,4 +665,13 @@ public class TopicQueueMappingUtils {
return null;
}
+
+ public static boolean checkIfLeader(List<LogicQueueMappingItem> items, TopicQueueMappingDetail mappingDetail) {
+ if (items == null
+ || mappingDetail == null
+ || items.isEmpty()) {
+ return false;
+ }
+ return items.get(items.size() - 1).getBname().equals(mappingDetail.getBname());
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 06f079c..56ee320 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -129,7 +129,7 @@ public class IntegrationTestBase {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
- brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+ brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
@@ -139,6 +139,7 @@ public class IntegrationTestBase {
storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+ storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
return createAndStartBroker(storeConfig, brokerConfig);
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 14bb967..fdefb06 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -1,10 +1,13 @@
package org.apache.rocketmq.test.statictopic;
+import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
+import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
@@ -357,6 +360,121 @@ public class StaticTopicIT extends BaseConf {
}
+ public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
+ List<MessageQueue> messageQueueList = producer.getMessageQueue();
+ Assert.assertEquals(queueNum, messageQueueList.size());
+ for (int i = 0; i < queueNum; i++) {
+ MessageQueue messageQueue = messageQueueList.get(i);
+ Assert.assertEquals(i, messageQueue.getQueueId());
+ Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+ String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+ Assert.assertEquals(destBrokerName, broker);
+ }
+
+ for(MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgEachQueue, messageQueue);
+ }
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
+ //leave the time to build the cq
+ Thread.sleep(100);
+ for(MessageQueue messageQueue: messageQueueList) {
+ Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+ Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
+ }
+ }
+
+
+ @Test
+ public void testRemappingAndClear() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+ int queueNum = 10;
+ int msgEachQueue = 100;
+ //create to broker1Name
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker1Name);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+ //leave the time to refresh the metadata
+ Thread.sleep(500);
+ sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
+ }
+
+ //remapping to broker2Name
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker2Name);
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+ //leave the time to refresh the metadata
+ Thread.sleep(500);
+ sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
+ }
+
+ //remapping to broker3Name
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker3Name);
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+ //leave the time to refresh the metadata
+ Thread.sleep(500);
+ sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
+ }
+
+ // 1 -> 2 -> 3, currently 1 should not has any mappings
+
+ {
+ for (int i = 0; i < 10; i++) {
+ for (BrokerController brokerController: brokerControllerList) {
+ brokerController.getTopicQueueMappingCleanService().wakeup();
+ }
+ Thread.sleep(100);
+ }
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ Assert.assertEquals(brokerNum, brokerConfigMap.size());
+ TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
+ TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
+ TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
+ Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
+ Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
+
+ Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
+
+ }
+ {
+ Set<String> topics = new HashSet<>(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet());
+ topics.remove(topic);
+ brokerController1.getMessageStore().cleanUnusedTopic(topics);
+ brokerController2.getMessageStore().cleanUnusedTopic(topics);
+ for (int i = 0; i < 10; i++) {
+ for (BrokerController brokerController: brokerControllerList) {
+ brokerController.getTopicQueueMappingCleanService().wakeup();
+ }
+ Thread.sleep(100);
+ }
+
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ Assert.assertEquals(brokerNum, brokerConfigMap.size());
+ TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
+ TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
+ TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
+ Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
+ Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
+ Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
+ //The first leader will clear it
+ for (List<LogicQueueMappingItem> items : config1.getMappingDetail().getHostedQueues().values()) {
+ Assert.assertEquals(3, items.size());
+ }
+ //The second leader do nothing
+ for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
+ Assert.assertEquals(1, items.size());
+ }
+
+ }
+
+
+ }
+
@Test
public void testRemappingWithNegativeLogicOffset() throws Exception {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index eed6b76..0fda471 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -190,61 +190,26 @@ public class MQAdminUtils {
public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
ClientMetadata clientMetadata = new ClientMetadata();
- boolean getFromBrokers = false;
- TopicRouteData routeData = null;
- try {
- routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- } catch (MQClientException exception) {
- if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
- } else {
- getFromBrokers = true;
- }
+ //check all the brokers
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo != null
+ && clusterInfo.getBrokerAddrTable() != null) {
+ clientMetadata.refreshClusterInfo(clusterInfo);
}
- if (!getFromBrokers) {
- if (routeData != null
- && !routeData.getQueueDatas().isEmpty()) {
- clientMetadata.freshTopicRoute(topic, routeData);
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr = clientMetadata.findMasterBrokerAddr(bname);
- try {
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- brokerConfigMap.put(bname, mapping);
- }
- } catch (MQBrokerException exception) {
- if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw exception;
- }
+ for (String broker : clientMetadata.getBrokerAddrTable().keySet()) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ try {
+ TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ if (mapping.getMappingDetail() != null) {
+ assert mapping.getMappingDetail().getBname().equals(broker);
}
-
+ brokerConfigMap.put(broker, mapping);
}
- }
- } else {
- //if cannot get from nameserver, then check all the brokers
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- if (clusterInfo != null
- && clusterInfo.getBrokerAddrTable() != null) {
- clientMetadata.refreshClusterInfo(clusterInfo);
- }
- for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
- String bname = entry.getKey();
- HashMap<Long, String> map = entry.getValue();
- String addr = map.get(MixAll.MASTER_ID);
- if (addr != null) {
- try {
- TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
- //allow the config is null
- if (mapping != null) {
- brokerConfigMap.put(bname, mapping);
- }
- } catch (MQBrokerException exception1) {
- if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
- throw exception1;
- }
- }
+ } catch (MQBrokerException exception1) {
+ if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+ throw exception1;
}
}
}