You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/01/27 06:58:19 UTC
[rocketmq] branch develop updated: [ISSUE #3789] optimize: Tag the name of consuming thread whith consumeGroup. (#3795)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5be91f [ISSUE #3789] optimize: Tag the name of consuming thread whith consumeGroup. (#3795)
d5be91f is described below
commit d5be91fa00136d8c2df37d83ea94dab37d630939
Author: 彭小漪 <64...@qq.com>
AuthorDate: Thu Jan 27 14:58:09 2022 +0800
[ISSUE #3789] optimize: Tag the name of consuming thread whith consumeGroup. (#3795)
* It is useful for debug.
---
.../ConsumeMessageConcurrentlyService.java | 8 +-
.../consumer/ConsumeMessageOrderlyService.java | 8 +-
.../ConsumeMessageConcurrentlyServiceTest.java | 32 ++++
.../consumer/ConsumeMessageOrderlyServiceTest.java | 177 +++++++++++++++++++++
4 files changed, 223 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 537dbee..384f3f1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -70,13 +70,19 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+ String consumeThreadPrefix = null;
+ if (consumerGroup.length() > 100) {
+ consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();
+ } else {
+ consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+ }
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
- new ThreadFactoryImpl("ConsumeMessageThread_"));
+ new ThreadFactoryImpl(consumeThreadPrefix));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 8d92b57..812e8ab 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -75,13 +75,19 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+ String consumeThreadPrefix = null;
+ if (consumerGroup.length() > 100) {
+ consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();
+ } else {
+ consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
+ }
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
- new ThreadFactoryImpl("ConsumeMessageThread_"));
+ new ThreadFactoryImpl(consumeThreadPrefix));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 7badc3b..c12f2fc 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -231,4 +231,36 @@ public class ConsumeMessageConcurrentlyServiceTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
+
+ @Test
+ public void testConsumeThreadName() throws Exception {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ final AtomicReference<String> consumeThreadName = new AtomicReference<String>();
+
+ StringBuilder consumeGroup2 = new StringBuilder();
+ for (int i = 0; i < 101; i++) {
+ consumeGroup2.append(i).append("#");
+ }
+ pushConsumer.setConsumerGroup(consumeGroup2.toString());
+ ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ consumeThreadName.set(Thread.currentThread().getName());
+ countDownLatch.countDown();
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
+
+ PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+ pullMessageService.executePullRequestImmediately(createPullRequest());
+ countDownLatch.await();
+ System.out.println(consumeThreadName.get());
+ if (consumeGroup2.length() <= 100) {
+ assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2 + "_");
+ } else {
+ assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2.substring(0, 100) + "_");
+ }
+ }
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
index 4cfa011..8ea1727 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
@@ -16,30 +16,146 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class ConsumeMessageOrderlyServiceTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private DefaultMQPushConsumer pushConsumer;
+ private MQClientInstance mQClientFactory;
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ private PullAPIWrapper pullAPIWrapper;
+ private RebalancePushImpl rebalancePushImpl;
@Before
public void init() throws Exception {
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ Collection<MQClientInstance> instances = factoryTable.values();
+ for (MQClientInstance instance : instances) {
+ instance.shutdown();
+ }
+ factoryTable.clear();
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+
+ pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+ pushConsumer.setPullInterval(60 * 1000);
+
+ pushConsumer.registerMessageListener(new MessageListenerOrderly() {
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ });
+
+
+ DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+ rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+ Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, rebalancePushImpl);
+ pushConsumer.subscribe(topic, "*");
+
+ // suppress updateTopicRouteInfoFromNameServer
+ pushConsumer.changeInstanceNameToPID();
+ mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
+ mQClientFactory = spy(mQClientFactory);
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, mQClientFactory);
+ factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, pullAPIWrapper);
+
+ pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+
+ when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer<PullResult>() {
+ @Override
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
+ PullMessageRequestHeader requestHeader = mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setQueueId(0);
+ messageClientExt.setMsgId("123");
+ messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+ ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+ return pullResult;
+ }
+ });
+
+ doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+ Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+ messageQueueSet.add(createPullRequest().getMessageQueue());
+ pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+ pushConsumer.start();
}
@Test
@@ -83,4 +199,65 @@ public class ConsumeMessageOrderlyServiceTest {
assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(CMResult.CR_THROW_EXCEPTION));
}
+ @Test
+ public void testConsumeThreadName() throws Exception {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ final AtomicReference<String> consumeThreadName = new AtomicReference<String>();
+
+ StringBuilder consumeGroup2 = new StringBuilder();
+ for (int i = 0; i < 101; i++) {
+ consumeGroup2.append(i).append("#");
+ }
+ pushConsumer.setConsumerGroup(consumeGroup2.toString());
+
+ MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ consumeThreadName.set(Thread.currentThread().getName());
+ countDownLatch.countDown();
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ };
+ ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+ pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(consumeMessageOrderlyService);
+
+
+ PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+ pullMessageService.executePullRequestImmediately(createPullRequest());
+ countDownLatch.await();
+ System.out.println(consumeThreadName.get());
+ if (consumeGroup2.length() <= 100) {
+ assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2 + "_");
+ } else {
+ assertThat(consumeThreadName.get()).startsWith("ConsumeMessageThread_" + consumeGroup2.substring(0, 100) + "_");
+ }
+ }
+
+ private PullRequest createPullRequest() {
+ PullRequest pullRequest = new PullRequest();
+ pullRequest.setConsumerGroup(consumerGroup);
+ pullRequest.setNextOffset(1024);
+
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName(brokerName);
+ messageQueue.setQueueId(0);
+ messageQueue.setTopic(topic);
+ pullRequest.setMessageQueue(messageQueue);
+ ProcessQueue processQueue = new ProcessQueue();
+ processQueue.setLocked(true);
+ processQueue.setLastLockTimestamp(System.currentTimeMillis());
+ pullRequest.setProcessQueue(processQueue);
+
+ return pullRequest;
+ }
+
+ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+ List<MessageExt> messageExtList) throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ for (MessageExt messageExt : messageExtList) {
+ outputStream.write(MessageDecoder.encode(messageExt, false));
+ }
+ return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+ }
+
}