You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2020/05/23 10:04:59 UTC
[rocketmq] branch develop updated: [ISSUE #1689]add interfaces to
remove unused statsItems in BrokerStatsManager. (#2029)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5873c76 [ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)
5873c76 is described below
commit 5873c76dbd29c8fefd7cb3f1df455eaeaec7ce83
Author: Hao Zhang <10...@qq.com>
AuthorDate: Sat May 23 18:04:41 2020 +0800
[ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)
---
.../broker/processor/AdminBrokerProcessor.java | 7 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 10 ++
.../rocketmq/common/stats/MomentStatsItemSet.java | 20 +++
.../apache/rocketmq/common/stats/StatsItemSet.java | 37 ++++++
.../apache/rocketmq/common/BrokerConfigTest.java | 2 +
.../apache/rocketmq/store/DefaultMessageStore.java | 4 +
.../rocketmq/store/stats/BrokerStatsManager.java | 20 +++
.../test/java/stats/BrokerStatsManagerTest.java | 141 +++++++++++++++++++++
8 files changed, 240 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fb7aece..eb81183 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
+ if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
+ this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
+ }
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
@@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
+ if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
+ this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
+ }
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a7568f0..bfe8a21 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -184,6 +184,8 @@ public class BrokerConfig {
private boolean storeReplyMessageEnable = true;
+ private boolean autoDeleteUnusedStats = false;
+
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -793,4 +795,12 @@ public class BrokerConfig {
public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
this.storeReplyMessageEnable = storeReplyMessageEnable;
}
+
+ public boolean isAutoDeleteUnusedStats() {
+ return autoDeleteUnusedStats;
+ }
+
+ public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
+ this.autoDeleteUnusedStats = autoDeleteUnusedStats;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
index 7ff26cd..4d2ce0c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -74,6 +74,26 @@ public class MomentStatsItemSet {
statsItem.getValue().set(value);
}
+ public void delValueByInfixKey(final String statsKey, String separator) {
+ Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MomentStatsItem> next = it.next();
+ if (next.getKey().contains(separator + statsKey + separator)) {
+ it.remove();
+ }
+ }
+ }
+
+ public void delValueBySuffixKey(final String statsKey, String separator) {
+ Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MomentStatsItem> next = it.next();
+ if (next.getKey().endsWith(separator + statsKey)) {
+ it.remove();
+ }
+ }
+ }
+
public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index 1c2de33..bcf9665 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -158,6 +158,43 @@ public class StatsItemSet {
statsItem.getTimes().addAndGet(incTimes);
}
+ public void delValue(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ this.statsItemTable.remove(statsKey);
+ }
+ }
+
+ public void delValueByPrefixKey(final String statsKey, String separator) {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ if (next.getKey().startsWith(statsKey + separator)) {
+ it.remove();
+ }
+ }
+ }
+
+ public void delValueByInfixKey(final String statsKey, String separator) {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ if (next.getKey().contains(separator + statsKey + separator)) {
+ it.remove();
+ }
+ }
+ }
+
+ public void delValueBySuffixKey(final String statsKey, String separator) {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ if (next.getKey().endsWith(separator + statsKey)) {
+ it.remove();
+ }
+ }
+ }
+
public StatsItem getAndCreateStatsItem(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
index aa8bcfa..07e132f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
@@ -37,11 +37,13 @@ public class BrokerConfigTest {
brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
+ brokerConfig.setAutoDeleteUnusedStats(true);
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
+ assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true);
}
}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f59b5eb9..14d24d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1029,6 +1029,10 @@ public class DefaultMessageStore implements MessageStore {
}
it.remove();
+ if (this.brokerConfig.isAutoDeleteUnusedStats()) {
+ this.brokerStatsManager.onTopicDeleted(topic);
+ }
+
log.info("cleanUnusedTopic: {},topic destroyed", topic);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 4adbed7..e151844 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -121,6 +121,26 @@ public class BrokerStatsManager {
return null;
}
+ public void onTopicDeleted(final String topic) {
+ this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
+ this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
+ this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
+ this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
+ this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
+ this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
+ }
+
+ public void onGroupDeleted(final String group) {
+ this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
+ this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
+ this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
+ this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
+ this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
+ this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
+ }
+
public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java
new file mode 100644
index 0000000..1702072
--- /dev/null
+++ b/store/src/test/java/stats/BrokerStatsManagerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package stats;
+
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_TIME;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BrokerStatsManagerTest {
+ private BrokerStatsManager brokerStatsManager;
+
+ private String TOPIC = "TOPIC_TEST";
+ private String GROUP_NAME = "GROUP_TEST";
+
+ @Before
+ public void init() {
+ brokerStatsManager = new BrokerStatsManager("DefaultCluster");
+ brokerStatsManager.start();
+ }
+
+ @After
+ public void destory() {
+ brokerStatsManager.shutdown();
+ }
+
+ @Test
+ public void testGetStatsItem() {
+ assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull();
+ }
+
+ @Test
+ public void testIncTopicPutNums() {
+ brokerStatsManager.incTopicPutNums(TOPIC);
+ assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
+ brokerStatsManager.incTopicPutNums(TOPIC, 2, 2);
+ assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getValue().doubleValue()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testIncTopicPutSize() {
+ brokerStatsManager.incTopicPutSize(TOPIC, 2);
+ assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC).getValue().doubleValue()).isEqualTo(2L);
+ }
+
+ @Test
+ public void testIncGroupGetNums() {
+ brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+ String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
+ assertThat(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testIncGroupGetSize() {
+ brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 1);
+ String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
+ assertThat(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testIncGroupGetLatency() {
+ brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+ String statsKey = String.format("%d@%s@%s", 1, TOPIC, GROUP_NAME);
+ assertThat(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, statsKey).getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testIncBrokerPutNums() {
+ brokerStatsManager.incBrokerPutNums();
+ assertThat(brokerStatsManager.getStatsItem(BROKER_PUT_NUMS, "DefaultCluster").getValue().doubleValue()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testOnTopicDeleted() {
+ brokerStatsManager.incTopicPutNums(TOPIC);
+ brokerStatsManager.incTopicPutSize(TOPIC, 100);
+ brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+ brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+ brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
+ brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+ brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
+ brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+
+ brokerStatsManager.onTopicDeleted(TOPIC);
+
+ Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
+ Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
+ }
+
+ @Test
+ public void testOnGroupDeleted(){
+ brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+ brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+ brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
+ brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+ brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
+ brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+
+ brokerStatsManager.onGroupDeleted(GROUP_NAME);
+
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
+ }
+}