You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/06 09:47:52 UTC
[rocketmq] 02/02: Add clean item logic for topic queue mapping
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 12915b807f54c7726fddae5cd0c3e177950850c8
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Dec 6 17:47:28 2021 +0800
Add clean item logic for topic queue mapping
---
.../apache/rocketmq/broker/BrokerController.java | 25 ++++++
.../broker/topic/TopicQueueMappingManager.java | 90 ++++++++++++++++++++++
.../apache/rocketmq/common/admin/TopicOffset.java | 9 +++
.../header/GetTopicStatsInfoRequestHeader.java | 3 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 24 +++++-
.../apache/rocketmq/common/rpc/RpcResponse.java | 2 +-
6 files changed, 150 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9230d95..6ca46dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -163,6 +163,9 @@ public class BrokerController {
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
+ //the topic queue mapping is costly, so use an independent executor
+ private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "BrokerControllerScheduledThread-TopicQueueMapping"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -498,6 +501,22 @@ public class BrokerController {
}
}, 1, 5, TimeUnit.SECONDS);
+ this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+ try {
+ this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
+ } catch (Throwable t) {
+ log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
+ }
+ }, 1, 5, TimeUnit.MINUTES);
+
+ this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
+ try {
+ this.topicQueueMappingManager.cleanItemExpired();
+ } catch (Throwable t) {
+ log.error("ScheduledTask cleanItemExpired failed", t);
+ }
+ }, 1, 5, TimeUnit.MINUTES);
+
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
@@ -892,6 +911,12 @@ public class BrokerController {
} catch (InterruptedException e) {
}
+ this.scheduledForTopicQueueMapping.shutdown();
+ try {
+ this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ } catch (Throwable ignored) {
+ }
+
this.unregisterBrokerAll();
if (this.sendMessageExecutor != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index c484bcf..9be442e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -21,6 +21,14 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,10 +40,14 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -272,4 +284,82 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
+
+ public void cleanItemExpired() {
+ String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
+ if (!UtilAll.isItTimeToDo(when)) {
+ return;
+ }
+ boolean changed = false;
+ long start = System.currentTimeMillis();
+ try {
+ for(String topic : topicQueueMappingTable.keySet()) {
+ TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
+ if (mappingDetail == null
+ || mappingDetail.getHostedQueues().isEmpty()) {
+ continue;
+ }
+ if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
+ log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
+ continue;
+ }
+ Set<String> brokers = new HashSet<>();
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ brokers.add(earlistItem.getBname());
+ }
+ Map<String, TopicStatsTable> statsTable = new HashMap<>();
+ for (String broker: brokers) {
+ GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+ header.setTopic(topic);
+ header.setBname(broker);
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+ } catch (Throwable rt) {
+ log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
+ }
+ }
+ for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
+ if (items.size() < 2) {
+ continue;
+ }
+ LogicQueueMappingItem earlistItem = items.get(0);
+ TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
+ if (topicStats == null) {
+ continue;
+ }
+ TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
+ if (topicOffset == null) {
+ //this may should not happen
+ log.warn("Get null topicOffset for {}", earlistItem);
+ continue;
+ }
+ if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
+ boolean result = items.remove(earlistItem);
+ changed = changed || result;
+ log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
+ }
+ }
+ UtilAll.sleep(10);
+ }
+ } catch (Throwable t) {
+ log.error("Try cleanItemExpired failed", t);
+ } finally {
+ if (changed) {
+ this.dataVersion.nextVersion();
+ this.persist();
+ log.info("CleanItemExpired changed");
+ }
+ log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
+ }
+ }
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
index 7e66749..8b52a88 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
@@ -44,4 +44,13 @@ public class TopicOffset {
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
+
+ @Override
+ public String toString() {
+ return "TopicOffset{" +
+ "minOffset=" + minOffset +
+ ", maxOffset=" + maxOffset +
+ ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+ '}';
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index c4cf4de..8e921b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,11 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetTopicStatsInfoRequestHeader implements CommandCustomHeader {
+public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 47ffcc2..83f31e7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,8 +1,8 @@
package org.apache.rocketmq.common.rpc;
-import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -83,6 +83,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.QUERY_CONSUMER_OFFSET:
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break;
+ case RequestCode.GET_TOPIC_STATS_INFO:
+ rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
+ break;
default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
}
@@ -209,6 +212,25 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
+ public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
+ rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
+ break;
+ }
+ default:{
+ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+ }
+ }
+ return rpcResponsePromise;
+ }
+
public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 5fcde36..2f61329 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -28,7 +28,7 @@ public class RpcResponse {
}
- public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+ public RpcResponse(int code, CommandCustomHeader header, Object body) {
this.code = code;
this.header = header;
this.body = body;