You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/11/28 08:37:07 UTC

[rocketmq-dashboard] branch master updated: [ISSUE #123]Optimize groupList.query (#124)

This is an automated email from the ASF dual-hosted git repository.

zhangjidi2016 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/master by this push:
     new 86bdb06  [ISSUE #123]Optimize groupList.query (#124)
86bdb06 is described below

commit 86bdb0636494bc23751f5df90fdb56a87b928ca7
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Mon Nov 28 16:37:02 2022 +0800

    [ISSUE #123]Optimize groupList.query (#124)
    
    Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
 .../service/impl/ConsumerServiceImpl.java          | 58 ++++++++++++++++++++--
 .../controller/ConsumerControllerTest.java         |  4 +-
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 3ad85d4..b1011b7 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -29,6 +29,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.annotation.Resource;
 import org.apache.commons.collections.CollectionUtils;
@@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.dashboard.config.RMQConfigure;
 import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
 import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
@@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
 import org.apache.rocketmq.dashboard.service.ConsumerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Service;
 
 import static com.google.common.base.Throwables.propagate;
 
 @Service
-public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
+public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
     private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
 
     @Resource
@@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
 
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
 
+    private ExecutorService executorService;
+
+    @Override
+    public void afterPropertiesSet() {
+        Runtime runtime = Runtime.getRuntime();
+        int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
+        int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
+        ThreadFactory threadFactory = new ThreadFactory() {
+            private final AtomicLong threadIndex = new AtomicLong(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
+            }
+        };
+        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+        this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000), threadFactory, handler);
+    }
+
+    @Override
+    public void destroy() {
+        ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
+    }
+
     static {
         SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
@@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
         catch (Exception err) {
             throw Throwables.propagate(err);
         }
-        List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
+        List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
+        CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
         for (String consumerGroup : consumerGroupSet) {
-            groupConsumeInfoList.add(queryGroup(consumerGroup));
+            executorService.submit(() -> {
+                try {
+                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+                    groupConsumeInfoList.add(consumeInfo);
+                } catch (Exception e) {
+                    logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
+                } finally {
+                    countDownLatch.countDown();
+                }
+            });
         }
+        try {
+            countDownLatch.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.error("query consumerGroup countDownLatch await Exception", e);
+        }
+
         if (!skipSysGroup) {
             groupConsumeInfoList.stream().map(group -> {
                 if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
index dfc3c22..b95e80a 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
 
     @Before
     public void init() throws Exception {
+        consumerService.afterPropertiesSet();
         super.mockRmqConfigure();
         ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
         when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
@@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest {
         perform = mockMvc.perform(requestBuilder);
         perform.andExpect(status().isOk())
             .andExpect(jsonPath("$.data", hasSize(2)))
-            .andExpect(jsonPath("$.data[0].group").value("group_test"))
             .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
             .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
+        // executorService shutdown
+        consumerService.destroy();
     }
 
     @Test