You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/14 05:52:41 UTC
[rocketmq] branch develop updated: [ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d407ec679 [ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)
d407ec679 is described below
commit d407ec679ac8a4e2e4b63d4033c10d0caba5f7bb
Author: lizhimins <70...@qq.com>
AuthorDate: Wed Sep 14 13:52:28 2022 +0800
[ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)
---
.../broker/processor/AdminBrokerProcessor.java | 36 ++++++++++++++++++++++
.../apache/rocketmq/store/timer/TimerMetrics.java | 11 +++++--
2 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0ec1c226a..cba1622fd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -181,6 +181,8 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -202,6 +204,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return this.deleteTopic(ctx, request);
case RequestCode.GET_ALL_TOPIC_CONFIG:
return this.getAllTopicConfig(ctx, request);
+ case RequestCode.GET_TIMER_CHECK_POINT:
+ return this.getTimerCheckPoint(ctx, request);
+ case RequestCode.GET_TIMER_METRICS:
+ return this.getTimerMetrics(ctx, request);
case RequestCode.UPDATE_BROKER_CONFIG:
return this.updateBrokerConfig(ctx, request);
case RequestCode.GET_BROKER_CONFIG:
@@ -704,6 +710,36 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
+ private RemotingCommand getTimerCheckPoint(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
+ TimerCheckpoint timerCheckpoint = this.brokerController.getTimerCheckpoint();
+ if (null == timerCheckpoint) {
+ LOGGER.error("AdminBrokerProcessor#getTimerCheckPoint: checkpoint is null, caller={}", ctx.channel().remoteAddress());
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The checkpoint is null");
+ return response;
+ }
+ response.setBody(TimerCheckpoint.encode(timerCheckpoint).array());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getTimerMetrics(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
+ TimerMessageStore timerMessageStore = this.brokerController.getMessageStore().getTimerMessageStore();
+ if (null == timerMessageStore) {
+ LOGGER.error("The timer message store is null, client: {}", ctx.channel().remoteAddress());
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The timer message store is null");
+ return response;
+ }
+ response.setBody(timerMessageStore.getTimerMetrics().encode().getBytes(StandardCharsets.UTF_8));
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index f86d2cf6b..a1b41d5c3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -56,6 +56,7 @@ public class TimerMetrics extends ConfigManager {
private final ConcurrentMap<Integer, Metric> timingDistribution =
new ConcurrentHashMap<>(1024);
+
public List<Integer> timerDist = new ArrayList<Integer>() {{
add(5);
add(60);
@@ -142,7 +143,7 @@ public class TimerMetrics extends ConfigManager {
@Override
public String encode() {
- return null;
+ return encode(false);
}
@Override
@@ -154,16 +155,20 @@ public class TimerMetrics extends ConfigManager {
public void decode(String jsonString) {
if (jsonString != null) {
TimerMetricsSerializeWrapper timerMetricsSerializeWrapper =
- TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class);
+ TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class);
if (timerMetricsSerializeWrapper != null) {
this.timingCount.putAll(timerMetricsSerializeWrapper.getTimingCount());
+ this.dataVersion.assignNewOne(timerMetricsSerializeWrapper.getDataVersion());
}
}
}
@Override
public String encode(boolean prettyFormat) {
- return null;
+ TimerMetricsSerializeWrapper metricsSerializeWrapper = new TimerMetricsSerializeWrapper();
+ metricsSerializeWrapper.setDataVersion(this.dataVersion);
+ metricsSerializeWrapper.setTimingCount(this.timingCount);
+ return metricsSerializeWrapper.toJson(prettyFormat);
}
public DataVersion getDataVersion() {