You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2020/05/23 10:04:59 UTC

[rocketmq] branch develop updated: [ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5873c76  [ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)
5873c76 is described below

commit 5873c76dbd29c8fefd7cb3f1df455eaeaec7ce83
Author: Hao Zhang <10...@qq.com>
AuthorDate: Sat May 23 18:04:41 2020 +0800

    [ISSUE #1689]add interfaces to remove unused statsItems in BrokerStatsManager. (#2029)
---
 .../broker/processor/AdminBrokerProcessor.java     |   7 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 ++
 .../rocketmq/common/stats/MomentStatsItemSet.java  |  20 +++
 .../apache/rocketmq/common/stats/StatsItemSet.java |  37 ++++++
 .../apache/rocketmq/common/BrokerConfigTest.java   |   2 +
 .../apache/rocketmq/store/DefaultMessageStore.java |   4 +
 .../rocketmq/store/stats/BrokerStatsManager.java   |  20 +++
 .../test/java/stats/BrokerStatsManagerTest.java    | 141 +++++++++++++++++++++
 8 files changed, 240 insertions(+), 1 deletion(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fb7aece..eb81183 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
         this.brokerController.getMessageStore()
             .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
+        if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
+            this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
+        }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
@@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
 
         this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
 
+        if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
+            this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
+        }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a7568f0..bfe8a21 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -184,6 +184,8 @@ public class BrokerConfig {
 
     private boolean storeReplyMessageEnable = true;
 
+    private boolean autoDeleteUnusedStats = false;
+
     public static String localHostName() {
         try {
             return InetAddress.getLocalHost().getHostName();
@@ -793,4 +795,12 @@ public class BrokerConfig {
     public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
         this.storeReplyMessageEnable = storeReplyMessageEnable;
     }
+
+    public boolean isAutoDeleteUnusedStats() {
+        return autoDeleteUnusedStats;
+    }
+
+    public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
+        this.autoDeleteUnusedStats = autoDeleteUnusedStats;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
index 7ff26cd..4d2ce0c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -74,6 +74,26 @@ public class MomentStatsItemSet {
         statsItem.getValue().set(value);
     }
 
+    public void delValueByInfixKey(final String statsKey, String separator) {
+        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MomentStatsItem> next = it.next();
+            if (next.getKey().contains(separator + statsKey + separator)) {
+                it.remove();
+            }
+        }
+    }
+
+    public void delValueBySuffixKey(final String statsKey, String separator) {
+        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MomentStatsItem> next = it.next();
+            if (next.getKey().endsWith(separator + statsKey)) {
+                it.remove();
+            }
+        }
+    }
+
     public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
         MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
         if (null == statsItem) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index 1c2de33..bcf9665 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -158,6 +158,43 @@ public class StatsItemSet {
         statsItem.getTimes().addAndGet(incTimes);
     }
 
+    public void delValue(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            this.statsItemTable.remove(statsKey);
+        }
+    }
+
+    public void delValueByPrefixKey(final String statsKey, String separator) {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            if (next.getKey().startsWith(statsKey + separator)) {
+                it.remove();
+            }
+        }
+    }
+
+    public void delValueByInfixKey(final String statsKey, String separator) {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            if (next.getKey().contains(separator + statsKey + separator)) {
+                it.remove();
+            }
+        }
+    }
+
+    public void delValueBySuffixKey(final String statsKey, String separator) {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            if (next.getKey().endsWith(separator + statsKey)) {
+                it.remove();
+            }
+        }
+    }
+
     public StatsItem getAndCreateStatsItem(final String statsKey) {
         StatsItem statsItem = this.statsItemTable.get(statsKey);
         if (null == statsItem) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
index aa8bcfa..07e132f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
@@ -37,11 +37,13 @@ public class BrokerConfigTest {
         brokerConfig.setBrokerId(0);
         brokerConfig.setBrokerClusterName("DefaultCluster");
         brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
+        brokerConfig.setAutoDeleteUnusedStats(true);
         assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
         assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
         assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
         assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
         assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
         assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
+        assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true);
     }
 }
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f59b5eb9..14d24d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1029,6 +1029,10 @@ public class DefaultMessageStore implements MessageStore {
                 }
                 it.remove();
 
+                if (this.brokerConfig.isAutoDeleteUnusedStats()) {
+                    this.brokerStatsManager.onTopicDeleted(topic);
+                }
+
                 log.info("cleanUnusedTopic: {},topic destroyed", topic);
             }
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 4adbed7..e151844 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -121,6 +121,26 @@ public class BrokerStatsManager {
         return null;
     }
 
+    public void onTopicDeleted(final String topic) {
+        this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
+        this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
+        this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
+        this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
+        this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
+        this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
+    }
+
+    public void onGroupDeleted(final String group) {
+        this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
+        this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
+        this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
+        this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
+        this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
+        this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
+    }
+
     public void incTopicPutNums(final String topic) {
         this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
     }
diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java
new file mode 100644
index 0000000..1702072
--- /dev/null
+++ b/store/src/test/java/stats/BrokerStatsManagerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package stats;
+
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_TIME;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
+import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BrokerStatsManagerTest {
+    private BrokerStatsManager brokerStatsManager;
+
+    private String TOPIC = "TOPIC_TEST";
+    private String GROUP_NAME = "GROUP_TEST";
+
+    @Before
+    public void init() {
+        brokerStatsManager = new BrokerStatsManager("DefaultCluster");
+        brokerStatsManager.start();
+    }
+
+    @After
+    public void destory() {
+        brokerStatsManager.shutdown();
+    }
+
+    @Test
+    public void testGetStatsItem() {
+        assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull();
+    }
+
+    @Test
+    public void testIncTopicPutNums() {
+        brokerStatsManager.incTopicPutNums(TOPIC);
+        assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
+        brokerStatsManager.incTopicPutNums(TOPIC, 2, 2);
+        assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getValue().doubleValue()).isEqualTo(3L);
+    }
+
+    @Test
+    public void testIncTopicPutSize() {
+        brokerStatsManager.incTopicPutSize(TOPIC, 2);
+        assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC).getValue().doubleValue()).isEqualTo(2L);
+    }
+
+    @Test
+    public void testIncGroupGetNums() {
+        brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+        String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
+        assertThat(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testIncGroupGetSize() {
+        brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 1);
+        String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
+        assertThat(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testIncGroupGetLatency() {
+        brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+        String statsKey = String.format("%d@%s@%s", 1, TOPIC, GROUP_NAME);
+        assertThat(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, statsKey).getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testIncBrokerPutNums() {
+        brokerStatsManager.incBrokerPutNums();
+        assertThat(brokerStatsManager.getStatsItem(BROKER_PUT_NUMS, "DefaultCluster").getValue().doubleValue()).isEqualTo(1L);
+    }
+
+    @Test
+    public void testOnTopicDeleted() {
+        brokerStatsManager.incTopicPutNums(TOPIC);
+        brokerStatsManager.incTopicPutSize(TOPIC, 100);
+        brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+        brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+        brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
+        brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+        brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
+        brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+
+        brokerStatsManager.onTopicDeleted(TOPIC);
+
+        Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
+        Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
+    }
+
+    @Test
+    public void testOnGroupDeleted(){
+        brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
+        brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+        brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
+        brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
+        brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
+        brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+
+        brokerStatsManager.onGroupDeleted(GROUP_NAME);
+
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
+    }
+}