You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/01/21 07:30:39 UTC
[rocketmq] branch develop updated: [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bda0ae0 [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)
bda0ae0 is described below
commit bda0ae0305863cc5fa2e0578439dfa4f4d9908b3
Author: 彭小漪 <64...@qq.com>
AuthorDate: Fri Jan 21 15:30:01 2022 +0800
[ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725)
---
.../ConsumeMessageConcurrentlyServiceTest.java | 25 +++++++++++++++++-----
1 file changed, 20 insertions(+), 5 deletions(-)
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 5d69aa2..7badc3b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -19,12 +19,16 @@ package org.apache.rocketmq.client.impl.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -36,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageClientExt;
@@ -45,10 +50,10 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -81,6 +86,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
@Before
public void init() throws Exception {
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ Collection<MQClientInstance> instances = factoryTable.values();
+ for (MQClientInstance instance : instances) {
+ instance.shutdown();
+ }
+ factoryTable.clear();
+
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -100,12 +112,15 @@ public class ConsumeMessageConcurrentlyServiceTest {
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
- pushConsumer.start();
- mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+ // suppress updateTopicRouteInfoFromNameServer
+ pushConsumer.changeInstanceNameToPID();
+ mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
+ mQClientFactory = spy(mQClientFactory);
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
+ factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
@@ -117,7 +132,6 @@ public class ConsumeMessageConcurrentlyServiceTest {
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
- mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
@@ -140,12 +154,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
});
doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+ pushConsumer.start();
}
- @Ignore
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);