You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 12:27:55 UTC
[rocketmq] 02/02: Try polishing the clear logic, need more polishment
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit c06564f68b2eb90b2774c736241faa15ac924c85
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 20:27:16 2021 +0800
Try polishing the clear logic, need more polishment
---
.../broker/topic/TopicQueueMappingManager.java | 161 ++++++++++++++-------
.../header/GetTopicConfigRequestHeader.java | 3 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 18 ++-
3 files changed, 121 insertions(+), 61 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1c11fde..c442040 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager {
public void cleanItemListMoreThanSecondGen() {
- for(String topic : topicQueueMappingTable.keySet()) {
- TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
- if (mappingDetail == null
- || mappingDetail.getHostedQueues().isEmpty()) {
- continue;
- }
- if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
- log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
- continue;
- }
- Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
- Integer queueId = entry.getKey();
- List<LogicQueueMappingItem> items = entry.getValue();
- if (items.size() <= 2) {
- continue;
- }
- LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
- LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
- if (!leaderItem.getBname().equals(mappingDetail.getBname())
- && !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
- it.remove();
- log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
- }
- }
- }
- }
-
-
- public void cleanItemExpired() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
- boolean changed = false;
- long start = System.currentTimeMillis();
- try {
- for(String topic : topicQueueMappingTable.keySet()) {
+
+ for(String topic : topicQueueMappingTable.keySet()) {
+ try {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
@@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager {
continue;
}
Set<String> brokers = new HashSet<>();
- for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
- LogicQueueMappingItem earlistItem = items.get(0);
- brokers.add(earlistItem.getBname());
+ LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+ if (!leaderItem.equals(mappingDetail.getBname())) {
+ brokers.add(leaderItem.getBname());
+ }
}
- Map<String, TopicStatsTable> statsTable = new HashMap<>();
+ if (brokers.isEmpty()) {
+ continue;
+ }
+ Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
for (String broker: brokers) {
- GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+ GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
- statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+ configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
- for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+
+ Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
+ Integer queueId = entry.getKey();
+ List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() < 2) {
continue;
}
- LogicQueueMappingItem earlistItem = items.get(0);
- TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
- if (topicStats == null) {
+ LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
+
+ TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname());
+ if (configAndQueueMapping == null) {
continue;
}
- TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
- if (topicOffset == null) {
- //this may should not happen
- log.warn("Get null topicOffset for {}", earlistItem);
+ List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
+ //TODO
+ }
+ } catch (Throwable tt) {
+ log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
+ } finally {
+ UtilAll.sleep(10);
+ }
+ }
+ }
+
+
+ public void cleanItemExpired() {
+ String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+ if (!UtilAll.isItTimeToDo(when)) {
+ return;
+ }
+ boolean changed = false;
+ long start = System.currentTimeMillis();
+ try {
+ for(String topic : topicQueueMappingTable.keySet()) {
+ try {
+ TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+ if (mappingDetail == null
+ || mappingDetail.getHostedQueues().isEmpty()) {
+ continue;
+ }
+ if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+ log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
- if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
- boolean result = items.remove(earlistItem);
- changed = changed || result;
- log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+ Set<String> brokers = new HashSet<>();
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ brokers.add(earlistItem.getBname());
+ }
+ Map<String, TopicStatsTable> statsTable = new HashMap<>();
+ for (String broker: brokers) {
+ GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+ header.setTopic(topic);
+ header.setBname(broker);
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+ } catch (Throwable rt) {
+ log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ }
+ }
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+ if (topicStats == null) {
+ continue;
+ }
+ TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+ if (topicOffset == null) {
+ //this may should not happen
+ log.warn("Get null topicOffset for {}", earlistItem);
+ continue;
+ }
+ if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+ //TODO be careful of the concurrent problem
+ //Should use the lock
+ boolean result = items.remove(earlistItem);
+ changed = changed || result;
+ log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+ }
}
+ } catch (Throwable tt) {
+ log.error("Try CleanItemExpired failed for {}", topic, tt);
+ } finally {
+ UtilAll.sleep(10);
}
- UtilAll.sleep(10);
}
} catch (Throwable t) {
log.error("Try cleanItemExpired failed", t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index 2b5d040..b282efa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -17,11 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetTopicConfigRequestHeader implements CommandCustomHeader {
+public class GetTopicConfigRequestHeader extends RpcRequestHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 83f31e7..6d75df9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.ArrayList;
import java.util.List;
@@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break;
case RequestCode.GET_TOPIC_STATS_INFO:
- rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+ rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
+ break;
+ case RequestCode.GET_TOPIC_CONFIG:
+ rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicConfigAndQueueMapping.class);
break;
default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
@@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
- public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
-
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
- TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
- rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+ rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
break;
}
default:{