You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 12:27:55 UTC

[rocketmq] 02/02: Try polishing the clear logic, need more polishment

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c06564f68b2eb90b2774c736241faa15ac924c85
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 20:27:16 2021 +0800

    Try polishing the clear logic, need more polishment
---
 .../broker/topic/TopicQueueMappingManager.java     | 161 ++++++++++++++-------
 .../header/GetTopicConfigRequestHeader.java        |   3 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  18 ++-
 3 files changed, 121 insertions(+), 61 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1c11fde..c442040 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager {
 
 
     public void cleanItemListMoreThanSecondGen() {
-        for(String topic : topicQueueMappingTable.keySet()) {
-            TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
-            if (mappingDetail == null
-                    || mappingDetail.getHostedQueues().isEmpty()) {
-                continue;
-            }
-            if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
-                log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
-                continue;
-            }
-            Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
-                Integer queueId = entry.getKey();
-                List<LogicQueueMappingItem> items = entry.getValue();
-                if (items.size() <= 2) {
-                    continue;
-                }
-                LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
-                LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
-                if (!leaderItem.getBname().equals(mappingDetail.getBname())
-                        && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
-                    it.remove();
-                    log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
-                }
-            }
-        }
-    }
-
-
-    public void cleanItemExpired() {
         String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
         if (!UtilAll.isItTimeToDo(when)) {
             return;
         }
-        boolean changed = false;
-        long start = System.currentTimeMillis();
-        try {
-            for(String topic : topicQueueMappingTable.keySet()) {
+
+        for(String topic : topicQueueMappingTable.keySet()) {
+            try {
                 TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
                 if (mappingDetail == null
                         || mappingDetail.getHostedQueues().isEmpty()) {
@@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager {
                     continue;
                 }
                 Set<String> brokers = new HashSet<>();
-                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
                     if (items.size() < 2) {
                         continue;
                     }
-                    LogicQueueMappingItem earlistItem = items.get(0);
-                    brokers.add(earlistItem.getBname());
+                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+                    if (!leaderItem.equals(mappingDetail.getBname())) {
+                        brokers.add(leaderItem.getBname());
+                    }
                 }
-                Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                if (brokers.isEmpty()) {
+                    continue;
+                }
+                Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
                 for (String broker: brokers) {
-                    GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                    GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
                     header.setTopic(topic);
                     header.setBname(broker);
                     try {
-                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                        RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
                         RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
                         if (rpcResponse.getException() != null) {
                             throw rpcResponse.getException();
                         }
-                        statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                        configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
                     } catch (Throwable rt) {
                         log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                     }
                 }
-                for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+
+                Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+                while (it.hasNext()) {
+                    Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+                    Integer queueId = entry.getKey();
+                    List<LogicQueueMappingItem> items = entry.getValue();
                     if (items.size() < 2) {
                         continue;
                     }
-                    LogicQueueMappingItem earlistItem = items.get(0);
-                    TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
-                    if (topicStats == null) {
+                    LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+
+                    TopicConfigAndQueueMapping configAndQueueMapping =  configAndQueueMappingMap.get(leaderItem.getBname());
+                    if (configAndQueueMapping == null) {
                         continue;
                     }
-                    TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
-                    if (topicOffset == null) {
-                        //this may should not happen
-                        log.warn("Get null topicOffset for {}", earlistItem);
+                    List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
+                    //TODO
+                }
+            } catch (Throwable tt) {
+                log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
+            } finally {
+                UtilAll.sleep(10);
+            }
+        }
+    }
+
+
+    public void cleanItemExpired() {
+        String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+        if (!UtilAll.isItTimeToDo(when)) {
+            return;
+        }
+        boolean changed = false;
+        long start = System.currentTimeMillis();
+        try {
+            for(String topic : topicQueueMappingTable.keySet()) {
+                try {
+                    TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+                    if (mappingDetail == null
+                            || mappingDetail.getHostedQueues().isEmpty()) {
+                        continue;
+                    }
+                    if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
                         continue;
                     }
-                    if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
-                        boolean result = items.remove(earlistItem);
-                        changed = changed || result;
-                        log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                    Set<String> brokers = new HashSet<>();
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        brokers.add(earlistItem.getBname());
+                    }
+                    Map<String, TopicStatsTable> statsTable = new HashMap<>();
+                    for (String broker: brokers) {
+                        GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                        header.setTopic(topic);
+                        header.setBname(broker);
+                        try {
+                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                            if (rpcResponse.getException() != null) {
+                                throw rpcResponse.getException();
+                            }
+                            statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+                        } catch (Throwable rt) {
+                            log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+                        }
+                    }
+                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+                        if (items.size() < 2) {
+                            continue;
+                        }
+                        LogicQueueMappingItem earlistItem = items.get(0);
+                        TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+                        if (topicStats == null) {
+                            continue;
+                        }
+                        TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+                        if (topicOffset == null) {
+                            //this may should not happen
+                            log.warn("Get null topicOffset for {}", earlistItem);
+                            continue;
+                        }
+                        if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+                            //TODO be careful of the concurrent problem
+                            //Should use the lock
+                            boolean result = items.remove(earlistItem);
+                            changed = changed || result;
+                            log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+                        }
                     }
+                } catch (Throwable tt) {
+                    log.error("Try CleanItemExpired failed for {}", topic, tt);
+                } finally {
+                    UtilAll.sleep(10);
                 }
-                UtilAll.sleep(10);
             }
         } catch (Throwable t) {
             log.error("Try cleanItemExpired failed", t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index 2b5d040..b282efa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -17,11 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
+public class GetTopicConfigRequestHeader extends RpcRequestHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 83f31e7..6d75df9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc;
 
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient {
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
                 case RequestCode.GET_TOPIC_STATS_INFO:
-                    rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+                    rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
+                    break;
+                case RequestCode.GET_TOPIC_CONFIG:
+                    rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicConfigAndQueueMapping.class);
                     break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
@@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
-    public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+    public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
-
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
         switch (responseCommand.getCode()) {
             case ResponseCode.SUCCESS: {
-                TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
-                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+                rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
                 break;
             }
             default:{