You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/09/12 02:17:57 UTC

[rocketmq] branch develop updated: [ISSUE] Add get stats and single queue stats for schedule topic

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 564ee29  [ISSUE] Add get stats and single queue stats for schedule topic
564ee29 is described below

commit 564ee290c7001c899e0eaeef8b6e8858c9b03c53
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Sun Sep 12 10:17:38 2021 +0800

    [ISSUE] Add get stats and single queue stats for schedule topic
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
 .../broker/processor/SendMessageProcessor.java     |  6 +++
 .../java/org/apache/rocketmq/common/MixAll.java    |  1 +
 .../store/schedule/ScheduleMessageService.java     |  5 +++
 .../rocketmq/store/stats/BrokerStatsManager.java   | 44 ++++++++++++++++++--
 .../test/java/stats/BrokerStatsManagerTest.java    | 47 ++++++++++++++++++++++
 5 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 1b1cf4b..2cd142f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -515,6 +516,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
         if (sendOK) {
 
+            if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
+                this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+                this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
+            }
+
             this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
             this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                 putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 9d95ecb..ec1e1f0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -58,6 +58,7 @@ public class MixAll {
     public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
     public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
     public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
+    public static final String SCHEDULE_CONSUMER_GROUP = "SCHEDULE_CONSUMER";
     public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
     public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
     public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index bacae1e..1164ab8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.topic.TopicValidator;
@@ -318,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager {
                                         if (putMessageResult != null
                                             && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                             if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
+                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
+                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
+                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
                                                     putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 7bb6a8b..3e643e3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -29,6 +29,10 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
 
 public class BrokerStatsManager {
 
+    public static final String QUEUE_PUT_NUMS = "QUEUE_PUT_NUMS";
+    public static final String QUEUE_PUT_SIZE = "QUEUE_PUT_SIZE";
+    public static final String QUEUE_GET_NUMS = "QUEUE_GET_NUMS";
+    public static final String QUEUE_GET_SIZE = "QUEUE_GET_SIZE";
     public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
     public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
     public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
@@ -74,6 +78,10 @@ public class BrokerStatsManager {
     public BrokerStatsManager(String clusterName) {
         this.clusterName = clusterName;
 
+        this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log));
+        this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log));
         this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
         this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
@@ -124,8 +132,12 @@ public class BrokerStatsManager {
     public void onTopicDeleted(final String topic) {
         this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
         this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
+        this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@");
         this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
         this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
         this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
         this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
         this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
@@ -135,12 +147,36 @@ public class BrokerStatsManager {
     public void onGroupDeleted(final String group) {
         this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
         this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
+        this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
+        this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
         this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
         this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
         this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
+    public void incQueuePutNums(final String topic, final Integer queueId) {
+        this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), 1, 1);
+    }
+
+    public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) {
+        this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), num, times);
+    }
+
+    public void incQueuePutSize(final String topic, final Integer queueId, final int size) {
+        this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, String.valueOf(queueId)), size, 1);
+    }
+
+    public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) {
+        final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
+        this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1);
+    }
+
+    public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) {
+        final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
+        this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1);
+    }
+
     public void incTopicPutNums(final String topic) {
         this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
     }
@@ -158,11 +194,11 @@ public class BrokerStatsManager {
         this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
     }
 
-    public String buildStatsKey(String topic, String group) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append(topic);
+    public String buildStatsKey(String prefix, String suffix) {
+        StringBuffer strBuilder = new StringBuffer();
+        strBuilder.append(prefix);
         strBuilder.append("@");
-        strBuilder.append(group);
+        strBuilder.append(suffix);
         return strBuilder.toString();
     }
 
diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java
index 1702072..2b6d0f8 100644
--- a/store/src/test/java/stats/BrokerStatsManagerTest.java
+++ b/store/src/test/java/stats/BrokerStatsManagerTest.java
@@ -29,6 +29,10 @@ import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_SIZE;
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
 import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
@@ -38,6 +42,7 @@ public class BrokerStatsManagerTest {
     private BrokerStatsManager brokerStatsManager;
 
     private String TOPIC = "TOPIC_TEST";
+    private Integer QUEUE_ID = 0;
     private String GROUP_NAME = "GROUP_TEST";
 
     @Before
@@ -57,6 +62,36 @@ public class BrokerStatsManagerTest {
     }
 
     @Test
+    public void testIncQueuePutNums() {
+        brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
+        String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
+        assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getTimes().doubleValue()).isEqualTo(1L);
+        brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID, 2, 2);
+        assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getValue().doubleValue()).isEqualTo(3L);
+    }
+
+    @Test
+    public void testIncQueuePutSize() {
+        brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 2);
+        String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
+        assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, statsKey).getValue().doubleValue()).isEqualTo(2L);
+    }
+
+    @Test
+    public void testIncQueueGetNums() {
+        brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+        final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
+        assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testIncQueueGetSize() {
+        brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+        final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
+        assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
     public void testIncTopicPutNums() {
         brokerStatsManager.incTopicPutNums(TOPIC);
         assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
@@ -101,8 +136,12 @@ public class BrokerStatsManagerTest {
     public void testOnTopicDeleted() {
         brokerStatsManager.incTopicPutNums(TOPIC);
         brokerStatsManager.incTopicPutSize(TOPIC, 100);
+        brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
+        brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
         brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
         brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+        brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+        brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
         brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
         brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
         brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
@@ -112,8 +151,12 @@ public class BrokerStatsManagerTest {
 
         Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
         Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, TOPIC + "@" + QUEUE_ID));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, TOPIC + "@" + QUEUE_ID));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
@@ -124,6 +167,8 @@ public class BrokerStatsManagerTest {
     public void testOnGroupDeleted(){
         brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
         brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+        brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+        brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
         brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
         brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
         brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
@@ -133,6 +178,8 @@ public class BrokerStatsManagerTest {
 
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));