You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/01/27 06:58:19 UTC

[rocketmq] branch develop updated: [ISSUE #3789] optimize: Tag the name of consuming thread whith consumeGroup. (#3795)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5be91f  [ISSUE #3789] optimize: Tag the name of  consuming thread whith consumeGroup. (#3795)
d5be91f is described below

commit d5be91fa00136d8c2df37d83ea94dab37d630939
Author: 彭小漪 <64...@qq.com>
AuthorDate: Thu Jan 27 14:58:09 2022 +0800

    [ISSUE #3789] optimize: Tag the name of  consuming thread whith consumeGroup. (#3795)
    
    * It is useful for debug.
---
 .../ConsumeMessageConcurrentlyService.java         |   8 +-
 .../consumer/ConsumeMessageOrderlyService.java     |   8 +-
 .../ConsumeMessageConcurrentlyServiceTest.java     |  32 ++++
 .../consumer/ConsumeMessageOrderlyServiceTest.java | 177 +++++++++++++++++++++
 4 files changed, 223 insertions(+), 2 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 537dbee..384f3f1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -70,13 +70,19 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
+        String consumeThreadPrefix = null;
+        if (consumerGroup.length() > 100) {
+            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();
+        } else {
+            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+        }
         this.consumeExecutor = new ThreadPoolExecutor(
             this.defaultMQPushConsumer.getConsumeThreadMin(),
             this.defaultMQPushConsumer.getConsumeThreadMax(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.consumeRequestQueue,
-            new ThreadFactoryImpl("ConsumeMessageThread_"));
+            new ThreadFactoryImpl(consumeThreadPrefix));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
         this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 8d92b57..812e8ab 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -75,13 +75,19 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
+        String consumeThreadPrefix = null;
+        if (consumerGroup.length() > 100) {
+            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();
+        } else {
+            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+        }
         this.consumeExecutor = new ThreadPoolExecutor(
             this.defaultMQPushConsumer.getConsumeThreadMin(),
             this.defaultMQPushConsumer.getConsumeThreadMax(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.consumeRequestQueue,
-            new ThreadFactoryImpl("ConsumeMessageThread_"));
+            new ThreadFactoryImpl(consumeThreadPrefix));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
     }
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 7badc3b..c12f2fc 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -231,4 +231,36 @@ public class ConsumeMessageConcurrentlyServiceTest {
         }
         return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
     }
+
+    @Test
+    public void testConsumeThreadName() throws Exception {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final AtomicReference<String> consumeThreadName = new AtomicReference<String>();
+
+        StringBuilder consumeGroup2 = new StringBuilder();
+        for (int i = 0; i < 101; i++) {
+            consumeGroup2.append(i).append("#");
+        }
+        pushConsumer.setConsumerGroup(consumeGroup2.toString());
+        ConsumeMessageConcurrentlyService  normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                consumeThreadName.set(Thread.currentThread().getName());
+                countDownLatch.countDown();
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
+
+        PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await();
+        System.out.println(consumeThreadName.get());
+        if (consumeGroup2.length() <= 100) {
+            assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2 + "_");
+        } else {
+            assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2.substring(0, 100) + "_");
+        }
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
index 4cfa011..8ea1727 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
@@ -16,30 +16,146 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class ConsumeMessageOrderlyServiceTest {
     private String consumerGroup;
     private String topic = "FooBar";
     private String brokerName = "BrokerA";
     private DefaultMQPushConsumer pushConsumer;
+    private MQClientInstance mQClientFactory;
 
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private PullAPIWrapper pullAPIWrapper;
+    private RebalancePushImpl rebalancePushImpl;
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        Collection<MQClientInstance> instances = factoryTable.values();
+        for (MQClientInstance instance : instances) {
+            instance.shutdown();
+        }
+        factoryTable.clear();
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+
+        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pushConsumer.setPullInterval(60 * 1000);
+
+        pushConsumer.registerMessageListener(new MessageListenerOrderly() {
+
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        });
+
+
+        DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+        rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+        Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, rebalancePushImpl);
+        pushConsumer.subscribe(topic, "*");
+
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
+        mQClientFactory = spy(mQClientFactory);
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, mQClientFactory);
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, pullAPIWrapper);
+
+        pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+                anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+                .thenAnswer(new Answer<PullResult>() {
+                    @Override
+                    public PullResult answer(InvocationOnMock mock) throws Throwable {
+                        PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                        MessageClientExt messageClientExt = new MessageClientExt();
+                        messageClientExt.setTopic(topic);
+                        messageClientExt.setQueueId(0);
+                        messageClientExt.setMsgId("123");
+                        messageClientExt.setBody(new byte[] {'a'});
+                        messageClientExt.setOffsetMsgId("234");
+                        messageClientExt.setBornHost(new InetSocketAddress(8080));
+                        messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                        PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                        ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                        return pullResult;
+                    }
+                });
+
+        doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest().getMessageQueue());
+        pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+        pushConsumer.start();
     }
 
     @Test
@@ -83,4 +199,65 @@ public class ConsumeMessageOrderlyServiceTest {
         assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(CMResult.CR_THROW_EXCEPTION));
     }
 
+    @Test
+    public void testConsumeThreadName() throws Exception {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final AtomicReference<String> consumeThreadName = new AtomicReference<String>();
+
+        StringBuilder consumeGroup2 = new StringBuilder();
+        for (int i = 0; i < 101; i++) {
+            consumeGroup2.append(i).append("#");
+        }
+        pushConsumer.setConsumerGroup(consumeGroup2.toString());
+
+        MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                consumeThreadName.set(Thread.currentThread().getName());
+                countDownLatch.countDown();
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        };
+        ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(consumeMessageOrderlyService);
+
+
+        PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await();
+        System.out.println(consumeThreadName.get());
+        if (consumeGroup2.length() <= 100) {
+            assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2 + "_");
+        } else {
+            assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2.substring(0, 100) + "_");
+        }
+    }
+
+    private PullRequest createPullRequest() {
+        PullRequest pullRequest = new PullRequest();
+        pullRequest.setConsumerGroup(consumerGroup);
+        pullRequest.setNextOffset(1024);
+
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        pullRequest.setMessageQueue(messageQueue);
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueue.setLocked(true);
+        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+        pullRequest.setProcessQueue(processQueue);
+
+        return pullRequest;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+                                           List<MessageExt> messageExtList) throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+    }
+
 }