You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/11/24 02:44:31 UTC

[rocketmq-dashboard] branch optimze_queryConsumer created (now e3b769d)

This is an automated email from the ASF dual-hosted git repository.

zhangjidi2016 pushed a change to branch optimze_queryConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


      at e3b769d  [ISSUE #123]Optimize groupList.query

This branch includes the following new commits:

     new e3b769d  [ISSUE #123]Optimize groupList.query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-dashboard] 01/01: [ISSUE #123]Optimize groupList.query

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangjidi2016 pushed a commit to branch optimze_queryConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git

commit e3b769db6aea68279ebbc136910ce304201c1a85
Author: zhangjidi <zh...@cmss.chinamobile.com>
AuthorDate: Thu Nov 24 10:43:07 2022 +0800

    [ISSUE #123]Optimize groupList.query
---
 .../service/impl/ConsumerServiceImpl.java          | 58 ++++++++++++++++++++--
 .../controller/ConsumerControllerTest.java         |  4 +-
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 3ad85d4..b1011b7 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -29,6 +29,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.annotation.Resource;
 import org.apache.commons.collections.CollectionUtils;
@@ -48,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.dashboard.config.RMQConfigure;
 import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
 import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
@@ -60,12 +69,14 @@ import org.apache.rocketmq.dashboard.service.AbstractCommonService;
 import org.apache.rocketmq.dashboard.service.ConsumerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Service;
 
 import static com.google.common.base.Throwables.propagate;
 
 @Service
-public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
+public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
     private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
 
     @Resource
@@ -73,6 +84,31 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
 
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
 
+    private ExecutorService executorService;
+
+    @Override
+    public void afterPropertiesSet() {
+        Runtime runtime = Runtime.getRuntime();
+        int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
+        int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
+        ThreadFactory threadFactory = new ThreadFactory() {
+            private final AtomicLong threadIndex = new AtomicLong(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
+            }
+        };
+        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+        this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000), threadFactory, handler);
+    }
+
+    @Override
+    public void destroy() {
+        ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
+    }
+
     static {
         SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
@@ -97,10 +133,26 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum
         catch (Exception err) {
             throw Throwables.propagate(err);
         }
-        List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
+        List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
+        CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
         for (String consumerGroup : consumerGroupSet) {
-            groupConsumeInfoList.add(queryGroup(consumerGroup));
+            executorService.submit(() -> {
+                try {
+                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+                    groupConsumeInfoList.add(consumeInfo);
+                } catch (Exception e) {
+                    logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
+                } finally {
+                    countDownLatch.countDown();
+                }
+            });
         }
+        try {
+            countDownLatch.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.error("query consumerGroup countDownLatch await Exception", e);
+        }
+
         if (!skipSysGroup) {
             groupConsumeInfoList.stream().map(group -> {
                 if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
index dfc3c22..b95e80a 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
 
     @Before
     public void init() throws Exception {
+        consumerService.afterPropertiesSet();
         super.mockRmqConfigure();
         ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
         when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
@@ -93,9 +94,10 @@ public class ConsumerControllerTest extends BaseControllerTest {
         perform = mockMvc.perform(requestBuilder);
         perform.andExpect(status().isOk())
             .andExpect(jsonPath("$.data", hasSize(2)))
-            .andExpect(jsonPath("$.data[0].group").value("group_test"))
             .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
             .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
+        // executorService shutdown
+        consumerService.destroy();
     }
 
     @Test