You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 09:47:52 UTC

[rocketmq] 02/02: Add clean item logic for topic queue mapping

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 12915b807f54c7726fddae5cd0c3e177950850c8
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 17:47:28 2021 +0800

    Add clean item logic for topic queue mapping
---
 .../apache/rocketmq/broker/BrokerController.java   | 25 ++++++
 .../broker/topic/TopicQueueMappingManager.java     | 90 ++++++++++++++++++++++
 .../apache/rocketmq/common/admin/TopicOffset.java  |  9 +++
 .../header/GetTopicStatsInfoRequestHeader.java     |  3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 24 +++++-
 .../apache/rocketmq/common/rpc/RpcResponse.java    |  2 +-
 6 files changed, 150 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9230d95..6ca46dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -163,6 +163,9 @@ public class BrokerController {
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "BrokerControllerScheduledThread"));
+    //the topic queue mapping is costly, so use an independent executor
+    private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerControllerScheduledThread-TopicQueueMapping"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -498,6 +501,22 @@ public class BrokerController {
                 }
             }, 1, 5, TimeUnit.SECONDS);
 
+            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+                try {
+                    this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
+                } catch (Throwable t) {
+                    log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
+                }
+            }, 1, 5, TimeUnit.MINUTES);
+
+            this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+                try {
+                    this.topicQueueMappingManager.cleanItemExpired();
+                } catch (Throwable t) {
+                    log.error("ScheduledTask cleanItemExpired failed", t);
+                }
+            }, 1, 5, TimeUnit.MINUTES);
+
             if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                     if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
@@ -892,6 +911,12 @@ public class BrokerController {
         } catch (InterruptedException e) {
         }
 
+        this.scheduledForTopicQueueMapping.shutdown();
+        try {
+            this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        } catch (Throwable ignored) {
+        }
+
         this.unregisterBrokerAll();
 
         if (this.sendMessageExecutor != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c484bcf..9be442e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -21,6 +21,14 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,10 +40,14 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.DefaultMessageStore;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -272,4 +284,82 @@ public class TopicQueueMappingManager extends ConfigManager {
         }
     }
 
+
+    public void cleanItemExpired() {
+        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            for(String topic : topicQueueMappingTable.keySet()) {
+                TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+                if (mappingDetail == null
+                        || mappingDetail.getHostedQueues().isEmpty()) {
+                    continue;
+                }
+                if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                    log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+                    continue;
+                }
+                Set<String> brokers = new HashSet<>();
+                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                    if (items.size() < 2) {
+                        continue;
+                    }
+                    LogicQueueMappingItem earlistItem = items.get(0);
+                    brokers.add(earlistItem.getBname());
+                }
+                Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                for (String broker: brokers) {
+                    GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                    header.setTopic(topic);
+                    header.setBname(broker);
+                    try {
+                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                        RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                        if (rpcResponse.getException() != null) {
+                            throw rpcResponse.getException();
+                        }
+                        statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                    } catch (Throwable rt) {
+                        log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                    }
+                }
+                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                    if (items.size() < 2) {
+                        continue;
+                    }
+                    LogicQueueMappingItem earlistItem = items.get(0);
+                    TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+                    if (topicStats == null) {
+                        continue;
+                    }
+                    TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+                    if (topicOffset == null) {
+                        //this may should not happen
+                        log.warn("Get null topicOffset for {}", earlistItem);
+                        continue;
+                    }
+                    if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+                        boolean result = items.remove(earlistItem);
+                        changed = changed || result;
+                        log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                    }
+                }
+                UtilAll.sleep(10);
+            }
+        } catch (Throwable t) {
+            log.error("Try cleanItemExpired failed", t);
+        } finally {
+            if (changed) {
+                this.dataVersion.nextVersion();
+                this.persist();
+                log.info("CleanItemExpired changed");
+            }
+            log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
+        }
+    }
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
index 7e66749..8b52a88 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
@@ -44,4 +44,13 @@ public class TopicOffset {
     public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
         this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
+
+    @Override
+    public String toString() {
+        return "TopicOffset{" +
+                "minOffset=" + minOffset +
+                ", maxOffset=" + maxOffset +
+                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+                '}';
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index c4cf4de..8e921b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,11 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicStatsInfoRequestHeader implements CommandCustomHeader {
+public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 47ffcc2..83f31e7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,8 +1,8 @@
 package org.apache.rocketmq.common.rpc;
 
-import com.alibaba.fastjson.JSON;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -83,6 +83,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.QUERY_CONSUMER_OFFSET:
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
+                case RequestCode.GET_TOPIC_STATS_INFO:
+                    rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+                    break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
             }
@@ -209,6 +212,25 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+    public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+                break;
+            }
+            default:{
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 5fcde36..2f61329 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -28,7 +28,7 @@ public class RpcResponse   {
 
     }
 
-    public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+    public RpcResponse(int code, CommandCustomHeader header, Object body) {
         this.code = code;
         this.header = header;
         this.body = body;