You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/14 05:52:41 UTC

[rocketmq] branch develop updated: [ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d407ec679 [ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)
d407ec679 is described below

commit d407ec679ac8a4e2e4b63d4033c10d0caba5f7bb
Author: lizhimins <70...@qq.com>
AuthorDate: Wed Sep 14 13:52:28 2022 +0800

    [ISSUE #5061] Fix the slave broker cannot synchronize timer metrics (#5065)
---
 .../broker/processor/AdminBrokerProcessor.java     | 36 ++++++++++++++++++++++
 .../apache/rocketmq/store/timer/TimerMetrics.java  | 11 +++++--
 2 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0ec1c226a..cba1622fd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -181,6 +181,8 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
@@ -202,6 +204,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return this.deleteTopic(ctx, request);
             case RequestCode.GET_ALL_TOPIC_CONFIG:
                 return this.getAllTopicConfig(ctx, request);
+            case RequestCode.GET_TIMER_CHECK_POINT:
+                return this.getTimerCheckPoint(ctx, request);
+            case RequestCode.GET_TIMER_METRICS:
+                return this.getTimerMetrics(ctx, request);
             case RequestCode.UPDATE_BROKER_CONFIG:
                 return this.updateBrokerConfig(ctx, request);
             case RequestCode.GET_BROKER_CONFIG:
@@ -704,6 +710,36 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    private RemotingCommand getTimerCheckPoint(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
+        TimerCheckpoint timerCheckpoint = this.brokerController.getTimerCheckpoint();
+        if (null == timerCheckpoint) {
+            LOGGER.error("AdminBrokerProcessor#getTimerCheckPoint: checkpoint is null, caller={}", ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("The checkpoint is null");
+            return response;
+        }
+        response.setBody(TimerCheckpoint.encode(timerCheckpoint).array());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getTimerMetrics(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
+        TimerMessageStore timerMessageStore = this.brokerController.getMessageStore().getTimerMessageStore();
+        if (null == timerMessageStore) {
+            LOGGER.error("The timer message store is null, client: {}", ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("The timer message store is null");
+            return response;
+        }
+        response.setBody(timerMessageStore.getTimerMetrics().encode().getBytes(StandardCharsets.UTF_8));
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
     private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index f86d2cf6b..a1b41d5c3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -56,6 +56,7 @@ public class TimerMetrics extends ConfigManager {
 
     private final ConcurrentMap<Integer, Metric> timingDistribution =
             new ConcurrentHashMap<>(1024);
+
     public List<Integer> timerDist = new ArrayList<Integer>() {{
             add(5);
             add(60);
@@ -142,7 +143,7 @@ public class TimerMetrics extends ConfigManager {
 
     @Override
     public String encode() {
-        return null;
+        return encode(false);
     }
 
     @Override
@@ -154,16 +155,20 @@ public class TimerMetrics extends ConfigManager {
     public void decode(String jsonString) {
         if (jsonString != null) {
             TimerMetricsSerializeWrapper timerMetricsSerializeWrapper =
-                    TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class);
+                TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class);
             if (timerMetricsSerializeWrapper != null) {
                 this.timingCount.putAll(timerMetricsSerializeWrapper.getTimingCount());
+                this.dataVersion.assignNewOne(timerMetricsSerializeWrapper.getDataVersion());
             }
         }
     }
 
     @Override
     public String encode(boolean prettyFormat) {
-        return null;
+        TimerMetricsSerializeWrapper metricsSerializeWrapper = new TimerMetricsSerializeWrapper();
+        metricsSerializeWrapper.setDataVersion(this.dataVersion);
+        metricsSerializeWrapper.setTimingCount(this.timingCount);
+        return metricsSerializeWrapper.toJson(prettyFormat);
     }
 
     public DataVersion getDataVersion() {