You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/01/21 07:30:39 UTC

[rocketmq] branch develop updated: [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bda0ae0  [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)
bda0ae0 is described below

commit bda0ae0305863cc5fa2e0578439dfa4f4d9908b3
Author: 彭小漪 <64...@qq.com>
AuthorDate: Fri Jan 21 15:30:01 2022 +0800

    [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)
---
 .../ConsumeMessageConcurrentlyServiceTest.java     | 25 +++++++++++++++++-----
 1 file changed, 20 insertions(+), 5 deletions(-)

diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 5d69aa2..7badc3b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -19,12 +19,16 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -36,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.message.MessageClientExt;
@@ -45,10 +50,10 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -81,6 +86,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
 
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        Collection<MQClientInstance> instances = factoryTable.values();
+        for (MQClientInstance instance : instances) {
+            instance.shutdown();
+        }
+        factoryTable.clear();
+
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -100,12 +112,15 @@ public class ConsumeMessageConcurrentlyServiceTest {
         field.setAccessible(true);
         field.set(pushConsumerImpl, rebalancePushImpl);
         pushConsumer.subscribe(topic, "*");
-        pushConsumer.start();
 
-        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
+        mQClientFactory = spy(mQClientFactory);
         field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
 
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
@@ -117,7 +132,6 @@ public class ConsumeMessageConcurrentlyServiceTest {
         field.set(pushConsumerImpl, pullAPIWrapper);
 
         pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
-        mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
                 anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
@@ -140,12 +154,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
                 });
 
         doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
         Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
         messageQueueSet.add(createPullRequest().getMessageQueue());
         pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+        pushConsumer.start();
     }
 
-    @Ignore
     @Test
     public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);