You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/07 11:39:38 UTC
[rocketmq] 01/02: Add TopicQueueMappingCleanService
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 1ca218f18d62cf7b464bffbe50c2452cafbfc70a
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Dec 7 19:15:51 2021 +0800
Add TopicQueueMappingCleanService
---
.../apache/rocketmq/broker/BrokerController.java | 42 +--
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../topic/TopicQueueMappingCleanService.java | 310 +++++++++++++++++++++
.../broker/topic/TopicQueueMappingManager.java | 178 +-----------
.../broker/topic/TopicQueueMappingManagerTest.java | 2 +-
.../java/org/apache/rocketmq/common/MixAll.java | 4 +-
.../common/statictopic/TopicQueueMappingUtils.java | 11 +-
7 files changed, 342 insertions(+), 207 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6ca46dc..4ef34f9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
@@ -64,11 +65,9 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
-import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -76,7 +75,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -87,9 +85,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -163,9 +158,6 @@ public class BrokerController {
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
- //the topic queue mapping is costly, so use an independent executor
- private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerControllerScheduledThread-TopicQueueMapping"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -202,6 +194,7 @@ public class BrokerController {
private InetSocketAddress storeHost;
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
+ private TopicQueueMappingCleanService topicQueueMappingCleanService;
private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
@@ -501,21 +494,6 @@ public class BrokerController {
}
}, 1, 5, TimeUnit.SECONDS);
- this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
- try {
- this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
- } catch (Throwable t) {
- log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
- }
- }, 1, 5, TimeUnit.MINUTES);
-
- this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
- try {
- this.topicQueueMappingManager.cleanItemExpired();
- } catch (Throwable t) {
- log.error("ScheduledTask cleanItemExpired failed", t);
- }
- }, 1, 5, TimeUnit.MINUTES);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -539,6 +517,8 @@ public class BrokerController {
}
}
+ this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
+
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
@@ -897,6 +877,10 @@ public class BrokerController {
this.fastRemotingServer.shutdown();
}
+ if (this.topicQueueMappingCleanService != null) {
+ this.topicQueueMappingCleanService.shutdown();
+ }
+
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
@@ -911,12 +895,6 @@ public class BrokerController {
} catch (InterruptedException e) {
}
- this.scheduledForTopicQueueMapping.shutdown();
- try {
- this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
- } catch (Throwable ignored) {
- }
-
this.unregisterBrokerAll();
if (this.sendMessageExecutor != null) {
@@ -1020,6 +998,10 @@ public class BrokerController {
this.fastRemotingServer.start();
}
+ if (this.topicQueueMappingCleanService != null) {
+ this.topicQueueMappingCleanService.start();
+ }
+
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 20dfc85..270c953 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,7 +323,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force);
+ this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force, false, true);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
new file mode 100644
index 0000000..91fd60d
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.rpc.RpcClient;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TopicQueueMappingCleanService extends ServiceThread {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private TopicQueueMappingManager topicQueueMappingManager;
+ private BrokerOuterAPI brokerOuterAPI;
+ private RpcClient rpcClient;
+ private MessageStoreConfig messageStoreConfig;
+ private BrokerConfig brokerConfig;
+
+ public TopicQueueMappingCleanService(BrokerController brokerController) {
+ this.topicQueueMappingManager = brokerController.getTopicQueueMappingManager();
+ this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
+ this.messageStoreConfig = brokerController.getMessageStoreConfig();
+ this.brokerConfig = brokerController.getBrokerConfig();
+ this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+ }
+
+ @Override
+ public String getServiceName() {
+ return TopicQueueMappingCleanService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("Start topic queue mapping clean service thread!");
+ while (!this.isStopped()) {
+ try {
+ cleanItemExpired();
+ } catch (Throwable t) {
+ log.error("topic queue mapping cleanItemExpired failed", t);
+ }
+ try {
+ cleanItemListMoreThanSecondGen();
+ } catch (Throwable t) {
+ log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
+ }
+ try {
+ this.waitForRunning(5L * 60 * 1000);
+ } catch (Throwable ignore) {
+
+ }
+ }
+ log.info("End topic queue mapping clean service thread!");
+ }
+
+
+
+ public void cleanItemExpired() {
+ String when = messageStoreConfig.getDeleteWhen();
+ if (!UtilAll.isItTimeToDo(when)) {
+ return;
+ }
+ boolean changed = false;
+ long start = System.currentTimeMillis();
+ try {
+ for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+ try {
+ if (isStopped()) {
+ break;
+ }
+ TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
+ if (mappingDetail == null
+ || mappingDetail.getHostedQueues().isEmpty()) {
+ continue;
+ }
+ if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
+ log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+ continue;
+ }
+ Set<String> brokers = new HashSet<>();
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ brokers.add(earlistItem.getBname());
+ }
+ Map<String, TopicStatsTable> statsTable = new HashMap<>();
+ for (String broker: brokers) {
+ GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+ header.setTopic(topic);
+ header.setBname(broker);
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+ RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+ } catch (Throwable rt) {
+ log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ }
+ }
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+ if (topicStats == null) {
+ continue;
+ }
+ TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+ if (topicOffset == null) {
+ //this may should not happen
+ log.warn("Get null topicOffset for {}", earlistItem);
+ continue;
+ }
+ //ignore the maxOffset < 0, which may in case of some error
+ if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()
+ || topicOffset.getMaxOffset() == 0) {
+ List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
+ boolean result = newItems.remove(earlistItem);
+ this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false);
+ changed = changed || result;
+ log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+ }
+ }
+ } catch (Throwable tt) {
+ log.error("Try CleanItemExpired failed for {}", topic, tt);
+ } finally {
+ UtilAll.sleep(10);
+ }
+ }
+ } catch (Throwable t) {
+ log.error("Try cleanItemExpired failed", t);
+ } finally {
+ if (changed) {
+ this.topicQueueMappingManager.getDataVersion().nextVersion();
+ this.topicQueueMappingManager.persist();
+ log.info("CleanItemExpired changed");
+ }
+ log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
+ }
+ }
+
+ public void cleanItemListMoreThanSecondGen() {
+ String when = messageStoreConfig.getDeleteWhen();
+ if (!UtilAll.isItTimeToDo(when)) {
+ return;
+ }
+ boolean changed = false;
+ long start = System.currentTimeMillis();
+ try {
+ ClientMetadata clientMetadata = new ClientMetadata();
+ for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+ try {
+ if (isStopped()) {
+ break;
+ }
+ TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
+ if (mappingDetail == null
+ || mappingDetail.getHostedQueues().isEmpty()) {
+ continue;
+ }
+ if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
+ log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+ continue;
+ }
+ Map<Integer, String> qid2CurrLeaderBroker = new HashMap<>();
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
+ Integer qId = entry.getKey();
+ List<LogicQueueMappingItem> items = entry.getValue();
+ if (items.isEmpty()) {
+ continue;
+ }
+ LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+ if (!leaderItem.getBname().equals(mappingDetail.getBname())) {
+ qid2CurrLeaderBroker.put(qId, leaderItem.getBname());
+ }
+ }
+ if (qid2CurrLeaderBroker.isEmpty()) {
+ continue;
+ }
+ //find the topic route
+ TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout());
+ clientMetadata.freshTopicRoute(topic, topicRouteData);
+ Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
+ //fine the real leader
+ for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
+ qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, entry.getKey())));
+ }
+
+ //find the mapping detail of real leader
+ Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
+ for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(entry.getValue())) {
+ continue;
+ }
+ String broker = entry.getValue();
+ GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
+ header.setTopic(topic);
+ header.setBname(broker);
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
+ RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail();
+ if (broker.equals(mappingDetailRemote.getBname())) {
+ mappingDetailMap.put(broker, mappingDetailRemote);
+ }
+ } catch (Throwable rt) {
+ log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ }
+ }
+ //check all the info
+ Set<Integer> ids2delete = new HashSet<>();
+ for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
+ Integer qId = entry.getKey();
+ String currLeaderBroker = entry.getValue();
+ String realLeaderBroker = qid2RealLeaderBroker.get(qId);
+ TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker);
+ if (remoteMappingDetail == null
+ || remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues()
+ || remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) {
+ continue;
+ }
+ List<LogicQueueMappingItem> items = remoteMappingDetail.getHostedQueues().get(qId);
+ if (items.isEmpty()) {
+ continue;
+ }
+ LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+ if (!realLeaderBroker.equals(leaderItem.getBname())) {
+ continue;
+ }
+ //all the check is ok
+ if (!realLeaderBroker.equals(currLeaderBroker)) {
+ ids2delete.add(qId);
+ }
+ }
+ for (Integer qid : ids2delete) {
+ List<LogicQueueMappingItem> items = mappingDetail.getHostedQueues().remove(qid);
+ changed = true;
+ if (items != null) {
+ log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items);
+ }
+ }
+ } catch (Throwable tt) {
+ log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
+ } finally {
+ UtilAll.sleep(10);
+ }
+ }
+ } catch (Throwable t) {
+ log.error("Try cleanItemListMoreThanSecondGen failed", t);
+ } finally {
+ if (changed) {
+ this.topicQueueMappingManager.getDataVersion().nextVersion();
+ this.topicQueueMappingManager.persist();
+ }
+ log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start);
+ }
+ }
+
+
+
+
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c442040..e76d25e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -22,27 +22,22 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.admin.TopicOffset;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.DefaultMessageStore;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,7 +69,7 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController;
}
- public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
+ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force, boolean isClean, boolean flush) throws Exception {
boolean locked = false;
boolean updated = false;
TopicQueueMappingDetail oldDetail = null;
@@ -124,7 +119,7 @@ public class TopicQueueMappingManager extends ConfigManager {
newDetail.getHostedQueues().put(globalId, oldItems);
}
} else {
- TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual);
+ TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual, isClean);
}
}
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
@@ -133,7 +128,8 @@ public class TopicQueueMappingManager extends ConfigManager {
if (locked) {
this.lock.unlock();
}
- if (updated) {
+ if (updated && flush) {
+ this.dataVersion.nextVersion();
this.persist();
log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
}
@@ -258,162 +254,4 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
-
- public void cleanItemListMoreThanSecondGen() {
- String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
- if (!UtilAll.isItTimeToDo(when)) {
- return;
- }
-
- for(String topic : topicQueueMappingTable.keySet()) {
- try {
- TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
- if (mappingDetail == null
- || mappingDetail.getHostedQueues().isEmpty()) {
- continue;
- }
- if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
- log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
- continue;
- }
- Set<String> brokers = new HashSet<>();
- for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
- if (!leaderItem.equals(mappingDetail.getBname())) {
- brokers.add(leaderItem.getBname());
- }
- }
- if (brokers.isEmpty()) {
- continue;
- }
- Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
- for (String broker: brokers) {
- GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
- header.setTopic(topic);
- header.setBname(broker);
- try {
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
- if (rpcResponse.getException() != null) {
- throw rpcResponse.getException();
- }
- configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
- } catch (Throwable rt) {
- log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
- }
- }
-
- Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
- Integer queueId = entry.getKey();
- List<LogicQueueMappingItem> items = entry.getValue();
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
-
- TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname());
- if (configAndQueueMapping == null) {
- continue;
- }
- List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
- //TODO
- }
- } catch (Throwable tt) {
- log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
- } finally {
- UtilAll.sleep(10);
- }
- }
- }
-
-
- public void cleanItemExpired() {
- String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
- if (!UtilAll.isItTimeToDo(when)) {
- return;
- }
- boolean changed = false;
- long start = System.currentTimeMillis();
- try {
- for(String topic : topicQueueMappingTable.keySet()) {
- try {
- TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
- if (mappingDetail == null
- || mappingDetail.getHostedQueues().isEmpty()) {
- continue;
- }
- if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
- log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
- continue;
- }
- Set<String> brokers = new HashSet<>();
- for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem earlistItem = items.get(0);
- brokers.add(earlistItem.getBname());
- }
- Map<String, TopicStatsTable> statsTable = new HashMap<>();
- for (String broker: brokers) {
- GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
- header.setTopic(topic);
- header.setBname(broker);
- try {
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
- if (rpcResponse.getException() != null) {
- throw rpcResponse.getException();
- }
- statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
- } catch (Throwable rt) {
- log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
- }
- }
- for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
- if (items.size() < 2) {
- continue;
- }
- LogicQueueMappingItem earlistItem = items.get(0);
- TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
- if (topicStats == null) {
- continue;
- }
- TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
- if (topicOffset == null) {
- //this may should not happen
- log.warn("Get null topicOffset for {}", earlistItem);
- continue;
- }
- if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
- //TODO be careful of the concurrent problem
- //Should use the lock
- boolean result = items.remove(earlistItem);
- changed = changed || result;
- log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
- }
- }
- } catch (Throwable tt) {
- log.error("Try CleanItemExpired failed for {}", topic, tt);
- } finally {
- UtilAll.sleep(10);
- }
- }
- } catch (Throwable t) {
- log.error("Try cleanItemExpired failed", t);
- } finally {
- if (changed) {
- this.dataVersion.nextVersion();
- this.persist();
- log.info("CleanItemExpired changed");
- }
- log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
- }
- }
-
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index 3c3f488..6b4faab 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -93,7 +93,7 @@ public class TopicQueueMappingManagerTest {
Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) {
for (int i = 0; i < 10; i++) {
- topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false);
+ topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true);
}
}
topicQueueMappingManager.persist();
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 13e34ac..58928ea 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -85,8 +85,8 @@ public class MixAll {
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
- public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
+ public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logic_broker__";
+ public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logic_broker_none__";
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 8aa1574..3ef45e0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -193,7 +193,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
- public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual) {
+ public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual, boolean isCLean) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
@@ -205,10 +205,15 @@ public class TopicQueueMappingUtils {
LogicQueueMappingItem newItem = newItems.get(inew);
LogicQueueMappingItem oldItem = oldItems.get(iold);
if (newItem.getGen() < oldItem.getGen()) {
+ //the old one may have been deleted
inew++;
- continue;
} else if (oldItem.getGen() < newItem.getGen()){
- throw new RuntimeException("The gen is not correct for old item");
+ //the new one may be the "delete one from "
+ if (isCLean) {
+ iold++;
+ } else {
+ throw new RuntimeException("The new item-list has less items than old item-list");
+ }
} else {
assert oldItem.getBname().equals(newItem.getBname());
assert oldItem.getQueueId() == newItem.getQueueId();