You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2022/01/06 00:32:50 UTC

[rocketmq-dashboard] branch master updated: [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/master by this push:
     new 529501c  [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)
529501c is described below

commit 529501c0075d4d5e2aeb495fedf33b8ea386efe8
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Jan 6 08:32:45 2022 +0800

    [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)
    
    * [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup.
    
    * modify method name
    
    * Optimize the delete logic
    
    * optimize code comments
---
 .../service/impl/ConsumerServiceImpl.java          | 28 ++++++++++++++++++++++
 .../controller/ConsumerControllerTest.java         |  2 ++
 2 files changed, 30 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 2d60501..3ad85d4 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,12 +23,14 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import javax.annotation.Resource;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -46,6 +48,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
 import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
 import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
 import org.apache.rocketmq.dashboard.model.QueueStatInfo;
@@ -65,6 +68,9 @@ import static com.google.common.base.Throwables.propagate;
 public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
     private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
 
+    @Resource
+    private RMQConfigure configure;
+
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
 
     static {
@@ -290,11 +296,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
 
     @Override
     public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
+        Set<String> brokerSet = this.fetchBrokerNameSetBySubscriptionGroup(deleteSubGroupRequest.getGroupName());
+        List<String> brokerList = deleteSubGroupRequest.getBrokerNameList();
+        boolean deleteInNsFlag = false;
+        // If the list of brokers passed in by the request contains the list of brokers that the consumer is in, delete RETRY and DLQ topic in namesrv
+        if (brokerList.containsAll(brokerSet)) {
+            deleteInNsFlag = true;
+        }
         try {
             ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
             for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
                 logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
                 mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
+                // Delete %RETRY%+Group and %DLQ%+Group in broker and namesrv
+                deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
+                deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
             }
         }
         catch (Exception e) {
@@ -303,6 +319,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
         return true;
     }
 
+    private void deleteResources(String topic, String brokerName, ClusterInfo clusterInfo, boolean deleteInNsFlag) throws Exception {
+        mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
+        Set<String> nameServerSet = null;
+        if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
+            String[] ns = configure.getNamesrvAddr().split(";");
+            nameServerSet = new HashSet<>(Arrays.asList(ns));
+        }
+        if (deleteInNsFlag) {
+            mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
+        }
+    }
+
     @Override
     public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
         try {
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
index 87c3cc0..dfc3c22 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
@@ -182,6 +182,8 @@ public class ConsumerControllerTest extends BaseControllerTest {
         final String url = "/consumer/deleteSubGroup.do";
         {
             doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
+            doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
+            doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
         }
         DeleteSubGroupRequest request = new DeleteSubGroupRequest();
         request.setBrokerNameList(Lists.newArrayList("broker-a"));