You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2022/01/06 00:32:50 UTC
[rocketmq-dashboard] branch master updated: [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new 529501c [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)
529501c is described below
commit 529501c0075d4d5e2aeb495fedf33b8ea386efe8
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Jan 6 08:32:45 2022 +0800
[ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup. (#57)
* [ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneously when deleting the consumerGroup.
* modify method name
* Optimize the delete logic
* optimize code comments
---
.../service/impl/ConsumerServiceImpl.java | 28 ++++++++++++++++++++++
.../controller/ConsumerControllerTest.java | 2 ++
2 files changed, 30 insertions(+)
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 2d60501..3ad85d4 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,12 +23,14 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -46,6 +48,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
@@ -65,6 +68,9 @@ import static com.google.common.base.Throwables.propagate;
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
+ @Resource
+ private RMQConfigure configure;
+
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
static {
@@ -290,11 +296,21 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
@Override
public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
+ Set<String> brokerSet = this.fetchBrokerNameSetBySubscriptionGroup(deleteSubGroupRequest.getGroupName());
+ List<String> brokerList = deleteSubGroupRequest.getBrokerNameList();
+ boolean deleteInNsFlag = false;
+ // If the list of brokers passed in by the request contains the list of brokers that the consumer is in, delete RETRY and DLQ topic in namesrv
+ if (brokerList.containsAll(brokerSet)) {
+ deleteInNsFlag = true;
+ }
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
+ // Delete %RETRY%+Group and %DLQ%+Group in broker and namesrv
+ deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
+ deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
}
}
catch (Exception e) {
@@ -303,6 +319,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
return true;
}
+ private void deleteResources(String topic, String brokerName, ClusterInfo clusterInfo, boolean deleteInNsFlag) throws Exception {
+ mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
+ Set<String> nameServerSet = null;
+ if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
+ String[] ns = configure.getNamesrvAddr().split(";");
+ nameServerSet = new HashSet<>(Arrays.asList(ns));
+ }
+ if (deleteInNsFlag) {
+ mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
+ }
+ }
+
@Override
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
try {
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
index 87c3cc0..dfc3c22 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
@@ -182,6 +182,8 @@ public class ConsumerControllerTest extends BaseControllerTest {
final String url = "/consumer/deleteSubGroup.do";
{
doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
+ doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
+ doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
}
DeleteSubGroupRequest request = new DeleteSubGroupRequest();
request.setBrokerNameList(Lists.newArrayList("broker-a"));