You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/09/12 02:17:57 UTC
[rocketmq] branch develop updated: [ISSUE] Add get stats and single
queue stats for schedule topic
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 564ee29 [ISSUE] Add get stats and single queue stats for schedule topic
564ee29 is described below
commit 564ee290c7001c899e0eaeef8b6e8858c9b03c53
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Sun Sep 12 10:17:38 2021 +0800
[ISSUE] Add get stats and single queue stats for schedule topic
Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
.../broker/processor/SendMessageProcessor.java | 6 +++
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
.../store/schedule/ScheduleMessageService.java | 5 +++
.../rocketmq/store/stats/BrokerStatsManager.java | 44 ++++++++++++++++++--
.../test/java/stats/BrokerStatsManagerTest.java | 47 ++++++++++++++++++++++
5 files changed, 99 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 1b1cf4b..2cd142f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -515,6 +516,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
+ if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
+ this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+ this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
+ }
+
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 9d95ecb..ec1e1f0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -58,6 +58,7 @@ public class MixAll {
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
+ public static final String SCHEDULE_CONSUMER_GROUP = "SCHEDULE_CONSUMER";
public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index bacae1e..1164ab8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -318,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 7bb6a8b..3e643e3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -29,6 +29,10 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
public class BrokerStatsManager {
+ public static final String QUEUE_PUT_NUMS = "QUEUE_PUT_NUMS";
+ public static final String QUEUE_PUT_SIZE = "QUEUE_PUT_SIZE";
+ public static final String QUEUE_GET_NUMS = "QUEUE_GET_NUMS";
+ public static final String QUEUE_GET_SIZE = "QUEUE_GET_SIZE";
public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
@@ -74,6 +78,10 @@ public class BrokerStatsManager {
public BrokerStatsManager(String clusterName) {
this.clusterName = clusterName;
+ this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log));
+ this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
@@ -124,8 +132,12 @@ public class BrokerStatsManager {
public void onTopicDeleted(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
+ this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
@@ -135,12 +147,36 @@ public class BrokerStatsManager {
public void onGroupDeleted(final String group) {
this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
+ this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
+ this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
+ public void incQueuePutNums(final String topic, final Integer queueId) {
+ this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), 1, 1);
+ }
+
+ public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) {
+ this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), num, times);
+ }
+
+ public void incQueuePutSize(final String topic, final Integer queueId, final int size) {
+ this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, String.valueOf(queueId)), size, 1);
+ }
+
+ public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) {
+ final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
+ this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1);
+ }
+
+ public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) {
+ final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
+ this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1);
+ }
+
public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
@@ -158,11 +194,11 @@ public class BrokerStatsManager {
this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
}
- public String buildStatsKey(String topic, String group) {
- StringBuilder strBuilder = new StringBuilder();
- strBuilder.append(topic);
+ public String buildStatsKey(String prefix, String suffix) {
+ StringBuffer strBuilder = new StringBuffer();
+ strBuilder.append(prefix);
strBuilder.append("@");
- strBuilder.append(group);
+ strBuilder.append(suffix);
return strBuilder.toString();
}
diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java
index 1702072..2b6d0f8 100644
--- a/store/src/test/java/stats/BrokerStatsManagerTest.java
+++ b/store/src/test/java/stats/BrokerStatsManagerTest.java
@@ -29,6 +29,10 @@ import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
@@ -38,6 +42,7 @@ public class BrokerStatsManagerTest {
private BrokerStatsManager brokerStatsManager;
private String TOPIC = "TOPIC_TEST";
+ private Integer QUEUE_ID = 0;
private String GROUP_NAME = "GROUP_TEST";
@Before
@@ -57,6 +62,36 @@ public class BrokerStatsManagerTest {
}
@Test
+ public void testIncQueuePutNums() {
+ brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
+ String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
+ assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getTimes().doubleValue()).isEqualTo(1L);
+ brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID, 2, 2);
+ assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getValue().doubleValue()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testIncQueuePutSize() {
+ brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 2);
+ String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
+ assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, statsKey).getValue().doubleValue()).isEqualTo(2L);
+ }
+
+ @Test
+ public void testIncQueueGetNums() {
+ brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+ final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
+ assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testIncQueueGetSize() {
+ brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+ final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
+ assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
public void testIncTopicPutNums() {
brokerStatsManager.incTopicPutNums(TOPIC);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
@@ -101,8 +136,12 @@ public class BrokerStatsManagerTest {
public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutNums(TOPIC);
brokerStatsManager.incTopicPutSize(TOPIC, 100);
+ brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
+ brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+ brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+ brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
@@ -112,8 +151,12 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, TOPIC + "@" + QUEUE_ID));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, TOPIC + "@" + QUEUE_ID));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
@@ -124,6 +167,8 @@ public class BrokerStatsManagerTest {
public void testOnGroupDeleted(){
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+ brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
+ brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
@@ -133,6 +178,8 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));