You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/07 11:39:38 UTC

[rocketmq] 01/02: Add TopicQueueMappingCleanService

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 1ca218f18d62cf7b464bffbe50c2452cafbfc70a
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Dec 7 19:15:51 2021 +0800

    Add TopicQueueMappingCleanService
---
 .../apache/rocketmq/broker/BrokerController.java   |  42 +--
 .../broker/processor/AdminBrokerProcessor.java     |   2 +-
 .../topic/TopicQueueMappingCleanService.java       | 310 +++++++++++++++++++++
 .../broker/topic/TopicQueueMappingManager.java     | 178 +-----------
 .../broker/topic/TopicQueueMappingManagerTest.java |   2 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |   4 +-
 .../common/statictopic/TopicQueueMappingUtils.java |  11 +-
 7 files changed, 342 insertions(+), 207 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6ca46dc..4ef34f9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
@@ -64,11 +65,9 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
 import org.apache.rocketmq.broker.util.ServiceProvider;
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -76,7 +75,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -87,9 +85,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -163,9 +158,6 @@ public class BrokerController {
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "BrokerControllerScheduledThread"));
-    //the topic queue mapping is costly, so use an independent executor
-    private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerControllerScheduledThread-TopicQueueMapping"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -202,6 +194,7 @@ public class BrokerController {
     private InetSocketAddress storeHost;
     private BrokerFastFailure brokerFastFailure;
     private Configuration configuration;
+    private TopicQueueMappingCleanService topicQueueMappingCleanService;
     private FileWatchService fileWatchService;
     private TransactionalMessageCheckService transactionalMessageCheckService;
     private TransactionalMessageService transactionalMessageService;
@@ -501,21 +494,6 @@ public class BrokerController {
                 }
             }, 1, 5, TimeUnit.SECONDS);
 
-            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
-                try {
-                    this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
-                } catch (Throwable t) {
-                    log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
-                }
-            }, 1, 5, TimeUnit.MINUTES);
-
-            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
-                try {
-                    this.topicQueueMappingManager.cleanItemExpired();
-                } catch (Throwable t) {
-                    log.error("ScheduledTask cleanItemExpired failed", t);
-                }
-            }, 1, 5, TimeUnit.MINUTES);
 
             if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -539,6 +517,8 @@ public class BrokerController {
                 }
             }
 
+            this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
+
             if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                 // Register a listener to reload SslContext
                 try {
@@ -897,6 +877,10 @@ public class BrokerController {
             this.fastRemotingServer.shutdown();
         }
 
+        if (this.topicQueueMappingCleanService != null) {
+            this.topicQueueMappingCleanService.shutdown();
+        }
+
         if (this.fileWatchService != null) {
             this.fileWatchService.shutdown();
         }
@@ -911,12 +895,6 @@ public class BrokerController {
         } catch (InterruptedException e) {
         }
 
-        this.scheduledForTopicQueueMapping.shutdown();
-        try {
-            this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
-        } catch (Throwable ignored) {
-        }
-
         this.unregisterBrokerAll();
 
         if (this.sendMessageExecutor != null) {
@@ -1020,6 +998,10 @@ public class BrokerController {
             this.fastRemotingServer.start();
         }
 
+        if (this.topicQueueMappingCleanService != null) {
+            this.topicQueueMappingCleanService.start();
+        }
+
         if (this.fileWatchService != null) {
             this.fileWatchService.start();
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 20dfc85..270c953 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,7 +323,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         try {
             this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-            this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force);
+            this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force, false, true);
 
             this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
             response.setCode(ResponseCode.SUCCESS);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
new file mode 100644
index 0000000..91fd60d
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.rpc.RpcClient;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TopicQueueMappingCleanService extends ServiceThread {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private TopicQueueMappingManager topicQueueMappingManager;
+    private BrokerOuterAPI brokerOuterAPI;
+    private RpcClient rpcClient;
+    private MessageStoreConfig messageStoreConfig;
+    private BrokerConfig brokerConfig;
+
+    public TopicQueueMappingCleanService(BrokerController brokerController) {
+        this.topicQueueMappingManager = brokerController.getTopicQueueMappingManager();
+        this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
+        this.messageStoreConfig = brokerController.getMessageStoreConfig();
+        this.brokerConfig = brokerController.getBrokerConfig();
+        this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
+    }
+
+    @Override
+    public String getServiceName() {
+        return TopicQueueMappingCleanService.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        log.info("Start topic queue mapping clean service thread!");
+        while (!this.isStopped()) {
+            try {
+                cleanItemExpired();
+            } catch (Throwable t) {
+                log.error("topic queue mapping cleanItemExpired failed", t);
+            }
+            try {
+                cleanItemListMoreThanSecondGen();
+            } catch (Throwable t) {
+                log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t);
+            }
+            try {
+                this.waitForRunning(5L * 60 * 1000);
+            } catch (Throwable ignore) {
+
+            }
+        }
+        log.info("End topic queue mapping clean service  thread!");
+    }
+
+
+
+    public void cleanItemExpired() {
+        String when = messageStoreConfig.getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+                try {
+                    if (isStopped()) {
+                        break;
+                    }
+                    TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
+                    if (mappingDetail == null
+                            || mappingDetail.getHostedQueues().isEmpty()) {
+                        continue;
+                    }
+                    if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
+                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                        continue;
+                    }
+                    Set<String> brokers = new HashSet<>();
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        brokers.add(earlistItem.getBname());
+                    }
+                    Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                    for (String broker: brokers) {
+                        GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                        header.setTopic(topic);
+                        header.setBname(broker);
+                        try {
+                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                            RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+                            if (rpcResponse.getException() != null) {
+                                throw rpcResponse.getException();
+                            }
+                            statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                        } catch (Throwable rt) {
+                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                        }
+                    }
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+                        if (topicStats == null) {
+                            continue;
+                        }
+                        TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+                        if (topicOffset == null) {
+                            //this may should not happen
+                            log.warn("Get null topicOffset for {}", earlistItem);
+                            continue;
+                        }
+                        //ignore the maxOffset < 0, which may in case of some error
+                        if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()
+                            || topicOffset.getMaxOffset() == 0) {
+                            List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
+                            boolean result = newItems.remove(earlistItem);
+                            this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false);
+                            changed = changed || result;
+                            log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                        }
+                    }
+                } catch (Throwable tt) {
+                    log.error("Try CleanItemExpired failed for {}", topic, tt);
+                } finally {
+                    UtilAll.sleep(10);
+                }
+            }
+        } catch (Throwable t) {
+            log.error("Try cleanItemExpired failed", t);
+        } finally {
+            if (changed) {
+                this.topicQueueMappingManager.getDataVersion().nextVersion();
+                this.topicQueueMappingManager.persist();
+                log.info("CleanItemExpired changed");
+            }
+            log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
+        }
+    }
+
+    public void cleanItemListMoreThanSecondGen() {
+        String when = messageStoreConfig.getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            ClientMetadata clientMetadata = new ClientMetadata();
+            for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+                try {
+                    if (isStopped()) {
+                        break;
+                    }
+                    TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
+                    if (mappingDetail == null
+                            || mappingDetail.getHostedQueues().isEmpty()) {
+                        continue;
+                    }
+                    if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
+                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                        continue;
+                    }
+                    Map<Integer, String> qid2CurrLeaderBroker = new HashMap<>();
+                    for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
+                        Integer qId = entry.getKey();
+                        List<LogicQueueMappingItem> items = entry.getValue();
+                        if (items.isEmpty()) {
+                            continue;
+                        }
+                        LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                        if (!leaderItem.getBname().equals(mappingDetail.getBname())) {
+                            qid2CurrLeaderBroker.put(qId, leaderItem.getBname());
+                        }
+                    }
+                    if (qid2CurrLeaderBroker.isEmpty()) {
+                        continue;
+                    }
+                    //find the topic route
+                    TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout());
+                    clientMetadata.freshTopicRoute(topic, topicRouteData);
+                    Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
+                    //fine the real leader
+                    for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
+                        qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, entry.getKey())));
+                    }
+
+                    //find the mapping detail of real leader
+                    Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
+                    for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
+                        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(entry.getValue())) {
+                            continue;
+                        }
+                        String broker = entry.getValue();
+                        GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
+                        header.setTopic(topic);
+                        header.setBname(broker);
+                        try {
+                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
+                            RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+                            if (rpcResponse.getException() != null) {
+                                throw rpcResponse.getException();
+                            }
+                            TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail();
+                            if (broker.equals(mappingDetailRemote.getBname())) {
+                                mappingDetailMap.put(broker, mappingDetailRemote);
+                            }
+                        } catch (Throwable rt) {
+                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                        }
+                    }
+                    //check all the info
+                    Set<Integer> ids2delete = new HashSet<>();
+                    for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
+                        Integer qId = entry.getKey();
+                        String currLeaderBroker = entry.getValue();
+                        String realLeaderBroker = qid2RealLeaderBroker.get(qId);
+                        TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker);
+                        if (remoteMappingDetail == null
+                                || remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues()
+                                || remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) {
+                            continue;
+                        }
+                        List<LogicQueueMappingItem> items = remoteMappingDetail.getHostedQueues().get(qId);
+                        if (items.isEmpty()) {
+                            continue;
+                        }
+                        LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                        if (!realLeaderBroker.equals(leaderItem.getBname())) {
+                            continue;
+                        }
+                        //all the check is ok
+                        if (!realLeaderBroker.equals(currLeaderBroker)) {
+                            ids2delete.add(qId);
+                        }
+                    }
+                    for (Integer qid : ids2delete) {
+                        List<LogicQueueMappingItem> items = mappingDetail.getHostedQueues().remove(qid);
+                        changed =  true;
+                        if (items != null) {
+                            log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items);
+                        }
+                    }
+                } catch (Throwable tt) {
+                    log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
+                } finally {
+                    UtilAll.sleep(10);
+                }
+            }
+        } catch (Throwable t) {
+            log.error("Try cleanItemListMoreThanSecondGen failed", t);
+        } finally {
+            if (changed) {
+                this.topicQueueMappingManager.getDataVersion().nextVersion();
+                this.topicQueueMappingManager.persist();
+            }
+            log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start);
+        }
+    }
+
+
+
+
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c442040..e76d25e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -22,27 +22,22 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.admin.TopicOffset;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.DefaultMessageStore;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -74,7 +69,7 @@ public class TopicQueueMappingManager extends ConfigManager {
         this.brokerController = brokerController;
     }
 
-    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
+    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force, boolean isClean, boolean flush) throws Exception {
         boolean locked = false;
         boolean updated = false;
         TopicQueueMappingDetail oldDetail = null;
@@ -124,7 +119,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                         newDetail.getHostedQueues().put(globalId, oldItems);
                     }
                 } else {
-                    TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual);
+                    TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual, isClean);
                 }
             }
             topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
@@ -133,7 +128,8 @@ public class TopicQueueMappingManager extends ConfigManager {
             if (locked) {
                 this.lock.unlock();
             }
-            if (updated) {
+            if (updated && flush) {
+                this.dataVersion.nextVersion();
                 this.persist();
                 log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force);
             }
@@ -258,162 +254,4 @@ public class TopicQueueMappingManager extends ConfigManager {
         }
     }
 
-
-    public void cleanItemListMoreThanSecondGen() {
-        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
-        if (!UtilAll.isItTimeToDo(when)) {
-            return;
-        }
-
-        for(String topic : topicQueueMappingTable.keySet()) {
-            try {
-                TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
-                if (mappingDetail == null
-                        || mappingDetail.getHostedQueues().isEmpty()) {
-                    continue;
-                }
-                if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
-                    log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
-                    continue;
-                }
-                Set<String> brokers = new HashSet<>();
-                for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
-                    if (items.size() < 2) {
-                        continue;
-                    }
-                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
-                    if (!leaderItem.equals(mappingDetail.getBname())) {
-                        brokers.add(leaderItem.getBname());
-                    }
-                }
-                if (brokers.isEmpty()) {
-                    continue;
-                }
-                Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
-                for (String broker: brokers) {
-                    GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
-                    header.setTopic(topic);
-                    header.setBname(broker);
-                    try {
-                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
-                        RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
-                        if (rpcResponse.getException() != null) {
-                            throw rpcResponse.getException();
-                        }
-                        configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
-                    } catch (Throwable rt) {
-                        log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
-                    }
-                }
-
-                Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
-                while (it.hasNext()) {
-                    Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
-                    Integer queueId = entry.getKey();
-                    List<LogicQueueMappingItem> items = entry.getValue();
-                    if (items.size() < 2) {
-                        continue;
-                    }
-                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
-
-                    TopicConfigAndQueueMapping configAndQueueMapping =  configAndQueueMappingMap.get(leaderItem.getBname());
-                    if (configAndQueueMapping == null) {
-                        continue;
-                    }
-                    List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
-                    //TODO
-                }
-            } catch (Throwable tt) {
-                log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
-            } finally {
-                UtilAll.sleep(10);
-            }
-        }
-    }
-
-
-    public void cleanItemExpired() {
-        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
-        if (!UtilAll.isItTimeToDo(when)) {
-            return;
-        }
-        boolean changed = false;
-        long start = System.currentTimeMillis();
-        try {
-            for(String topic : topicQueueMappingTable.keySet()) {
-                try {
-                    TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
-                    if (mappingDetail == null
-                            || mappingDetail.getHostedQueues().isEmpty()) {
-                        continue;
-                    }
-                    if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
-                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
-                        continue;
-                    }
-                    Set<String> brokers = new HashSet<>();
-                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
-                        if (items.size() < 2) {
-                            continue;
-                        }
-                        LogicQueueMappingItem earlistItem = items.get(0);
-                        brokers.add(earlistItem.getBname());
-                    }
-                    Map<String, TopicStatsTable> statsTable = new HashMap<>();
-                    for (String broker: brokers) {
-                        GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
-                        header.setTopic(topic);
-                        header.setBname(broker);
-                        try {
-                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
-                            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
-                            if (rpcResponse.getException() != null) {
-                                throw rpcResponse.getException();
-                            }
-                            statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
-                        } catch (Throwable rt) {
-                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
-                        }
-                    }
-                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
-                        if (items.size() < 2) {
-                            continue;
-                        }
-                        LogicQueueMappingItem earlistItem = items.get(0);
-                        TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
-                        if (topicStats == null) {
-                            continue;
-                        }
-                        TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
-                        if (topicOffset == null) {
-                            //this may should not happen
-                            log.warn("Get null topicOffset for {}", earlistItem);
-                            continue;
-                        }
-                        if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
-                            //TODO be careful of the concurrent problem
-                            //Should use the lock
-                            boolean result = items.remove(earlistItem);
-                            changed = changed || result;
-                            log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
-                        }
-                    }
-                } catch (Throwable tt) {
-                    log.error("Try CleanItemExpired failed for {}", topic, tt);
-                } finally {
-                    UtilAll.sleep(10);
-                }
-            }
-        } catch (Throwable t) {
-            log.error("Try cleanItemExpired failed", t);
-        } finally {
-            if (changed) {
-                this.dataVersion.nextVersion();
-                this.persist();
-                log.info("CleanItemExpired changed");
-            }
-            log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
-        }
-    }
-
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index 3c3f488..6b4faab 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -93,7 +93,7 @@ public class TopicQueueMappingManagerTest {
             Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size());
             for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) {
                 for (int i = 0; i < 10; i++) {
-                    topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false);
+                    topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true);
                 }
             }
             topicQueueMappingManager.persist();
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 13e34ac..58928ea 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -85,8 +85,8 @@ public class MixAll {
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
-    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logic_broker__";
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logic_broker_none__";
 
     public static String getWSAddr() {
         String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 8aa1574..3ef45e0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -193,7 +193,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual) {
+    public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual, boolean isCLean) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
@@ -205,10 +205,15 @@ public class TopicQueueMappingUtils {
             LogicQueueMappingItem newItem = newItems.get(inew);
             LogicQueueMappingItem oldItem = oldItems.get(iold);
             if (newItem.getGen() < oldItem.getGen()) {
+                //the old one may have been deleted
                 inew++;
-                continue;
             } else if (oldItem.getGen() < newItem.getGen()){
-                throw new RuntimeException("The gen is not correct for old item");
+                //the new one may be the "delete one from "
+                if (isCLean) {
+                    iold++;
+                } else {
+                   throw new RuntimeException("The new item-list has less items than old item-list");
+                }
             } else {
                 assert oldItem.getBname().equals(newItem.getBname());
                 assert oldItem.getQueueId() == newItem.getQueueId();