You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 08:41:50 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix unit test
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new d4a656c Fix unit test
d4a656c is described below
commit d4a656c18c30f85653079da3984dcef0c9ae4d4b
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 16:41:31 2021 +0800
Fix unit test
---
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 2 +-
.../apache/rocketmq/broker/BrokerStartupTest.java | 20 +++++-
.../broker/processor/AdminBrokerProcessorTest.java | 4 +-
.../broker/topic/TopicConfigManagerTest.java | 76 ----------------------
.../rocketmq/client/impl/MQClientAPIImpl.java | 12 ----
.../client/impl/factory/MQClientInstance.java | 3 +-
.../store/RemoteBrokerOffsetStoreTest.java | 10 ++-
.../protocol/header/GetMaxOffsetRequestHeader.java | 8 ---
.../apache/rocketmq/common/ConfigManagerTest.java | 5 +-
.../remoting/protocol/RemotingCommandTest.java | 10 ++-
.../tools/admin/DefaultMQAdminExtTest.java | 43 +++++-------
.../command/message/ConsumeMessageCommandTest.java | 4 +-
12 files changed, 57 insertions(+), 140 deletions(-)
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 339ed11..daf771a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -84,7 +84,7 @@ public class BrokerOuterAPITest {
private BrokerOuterAPI brokerOuterAPI;
public void init() throws Exception {
- brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+ brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), brokerController);
Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
field.setAccessible(true);
field.set(brokerOuterAPI, nettyRemotingClient);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
index c8da08d..ce370a3 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.broker;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Properties;
+import org.apache.rocketmq.common.MixAll;
import org.junit.Assert;
import org.junit.Test;
@@ -34,8 +35,21 @@ public class BrokerStartupTest {
Class<BrokerStartup> clazz = BrokerStartup.class;
Method method = clazz.getDeclaredMethod("properties2SystemEnv", Properties.class);
method.setAccessible(true);
- System.setProperty("rocketmq.namesrv.domain", "value");
- method.invoke(null, properties);
- Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
+ {
+ properties.put("rmqAddressServerDomain", "value1");
+ properties.put("rmqAddressServerSubGroup", "value2");
+ method.invoke(null, properties);
+ Assert.assertEquals("value1", System.getProperty("rocketmq.namesrv.domain"));
+ Assert.assertEquals("value2", System.getProperty("rocketmq.namesrv.domain.subgroup"));
+ }
+ {
+ properties.put("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
+ properties.put("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
+ method.invoke(null, properties);
+ Assert.assertEquals(MixAll.WS_DOMAIN_NAME, System.getProperty("rocketmq.namesrv.domain"));
+ Assert.assertEquals(MixAll.WS_DOMAIN_SUBGROUP, System.getProperty("rocketmq.namesrv.domain.subgroup"));
+ }
+
+
}
}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 3770dae..2141f2c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -101,7 +101,7 @@ public class AdminBrokerProcessorTest {
public void init() throws Exception {
brokerController.setMessageStore(messageStore);
- doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
+ //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
@@ -203,7 +203,7 @@ public class AdminBrokerProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
request.makeCustomHeaderToNet();
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
assertThat(response.getRemark()).contains("No topic in this broker.");
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
deleted file mode 100644
index 4a60437..0000000
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.topic;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TopicConfigManagerTest {
- @Mock
- private DefaultMessageStore messageStore;
- @Mock
- private BrokerController brokerController;
-
- private TopicConfigManager topicConfigManager;
-
- private static final String topic = "FooBar";
- private static final String broker1Name = "broker1";
- private static final String broker1Addr = "127.0.0.1:12345";
- private static final int queueId1 = 1;
- private static final String broker2Name = "broker2";
- private static final String broker2Addr = "127.0.0.2:12345";
- private static final int queueId2 = 2;
-
- @Before
- public void before() {
- BrokerConfig brokerConfig = new BrokerConfig();
- brokerConfig.setBrokerName(broker1Name);
- when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-
- when(brokerController.getMessageStore()).thenReturn(messageStore);
-
- MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
- messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir"));
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
- this.topicConfigManager = new TopicConfigManager(brokerController);
- this.topicConfigManager.getTopicConfigTable().put(topic, new TopicConfig(topic));
- }
-
- @After
- public void after() throws Exception {
- if (topicConfigManager != null) {
- Files.deleteIfExists(Paths.get(topicConfigManager.configFilePath()));
- }
- }
-
-}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 0c15cff..8f0138d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1109,26 +1109,14 @@ public class MQClientAPIImpl {
public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
- return getMaxOffset(addr, topic, queueId, true, false, timeoutMillis);
- }
-
- public long getMaxOffset(final String addr, final String topic, final int queueId, boolean committed,
- boolean fromLogicalQueue,
- final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
- requestHeader.setCommitted(committed);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
- HashMap<String, String> extFields = response.getExtFields();
- if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) {
- throw new MQRedirectException(response.getBody());
- }
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetMaxOffsetResponseHeader responseHeader =
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 793189e..0181951 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1020,7 +1020,8 @@ public class MQClientInstance {
public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
- if (topicEndPointsTable.get(mq.getTopic()) != null
+ if (topicEndPointsTable != null
+ && topicEndPointsTable.get(mq.getTopic()) != null
&& !topicEndPointsTable.get(mq.getTopic()).isEmpty()) {
return topicEndPointsTable.get(mq.getTopic()).get(mq);
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index f762910..73cfefb 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -20,10 +20,12 @@ import java.util.Collections;
import java.util.HashSet;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -60,6 +62,7 @@ public class RemoteBrokerOffsetStoreTest {
when(mQClientFactory.getClientId()).thenReturn(clientId);
when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+ when(mQClientFactory.getBrokerNameFromMessageQueue(any())).thenReturn(brokerName);
}
@Test
@@ -84,10 +87,15 @@ public class RemoteBrokerOffsetStoreTest {
offsetStore.updateOffset(messageQueue, 1024, false);
- doThrow(new MQBrokerException(-1, "", null))
+ doThrow(new OffsetNotFoundException(ResponseCode.PULL_NOT_FOUND, "", null))
.when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+
+ doThrow(new MQBrokerException(-1, "", null))
+ .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
+
doThrow(new RemotingException("", null))
.when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 2a577d7..e961af9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -29,7 +29,6 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
private String topic;
@CFNotNull
private Integer queueId;
- private boolean committed;
@Override
public void checkFields() throws RemotingCommandException {
@@ -55,11 +54,4 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
this.queueId = queueId;
}
- public void setCommitted(boolean committed) {
- this.committed = committed;
- }
-
- public boolean isCommitted() {
- return committed;
- }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
index a884b6a..a61ec4c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
@@ -15,13 +15,10 @@ package org.apache.rocketmq.common;/*
* limitations under the License.
*/
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.MixAll;
-import org.junit.Test;
-
import java.io.File;
import java.io.PrintWriter;
import java.lang.reflect.Method;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index f2f6935..a0fc765 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -209,7 +209,11 @@ public class RemotingCommandTest {
SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader();
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader);
Field[] fields = remotingCommand.getClazzFields(subExtFieldsHeader.getClass());
- Assert.assertEquals(7, fields.length);
+ Set<String> fieldNames = new HashSet<>();
+ for (Field field: fields) {
+ fieldNames.add(field.getName());
+ }
+ Assert.assertTrue(fields.length >= 7);
Set<String> names = new HashSet<>();
names.add("stringValue");
names.add("intValue");
@@ -218,8 +222,8 @@ public class RemotingCommandTest {
names.add("doubleValue");
names.add("name");
names.add("value");
- for (Field field : fields) {
- Assert.assertTrue(names.contains(field.getName()));
+ for (String name: names) {
+ Assert.assertTrue(fieldNames.contains(name));
}
remotingCommand.makeCustomHeaderToNet();
SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass());
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index deb3d05..4514ef4 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -16,6 +16,18 @@
*/
package org.apache.rocketmq.tools.admin;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,6 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -66,25 +79,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -248,7 +247,8 @@ public class DefaultMQAdminExtTest {
put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig"));
}
});
- when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper);
+ //when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper);
+ when(mQClientAPIImpl.getTopicConfig(anyString(), anyString(), anyLong())).thenReturn(new TopicConfigAndQueueMapping(new TopicConfig("topic_test_examine_topicConfig"), null));
}
@AfterClass
@@ -439,7 +439,7 @@ public class DefaultMQAdminExtTest {
@Test
public void testMaxOffset() throws Exception {
- when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(100L);
+ when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(100L);
assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L);
}
@@ -451,19 +451,8 @@ public class DefaultMQAdminExtTest {
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L);
}
- @Test
- public void testMaxOffset_LogicalQueue() throws Exception {
- when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L);
- assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0))).isEqualTo(1010L);
- }
- @Test
- public void testSearchOffset_LogicalQueue() throws Exception {
- when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L);
-
- assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0), System.currentTimeMillis())).isEqualTo(1011L);
- }
@Test
public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 1154395..25aa3f8 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -113,7 +113,7 @@ public class ConsumeMessageCommandTest {
@Test
public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
- when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);
@@ -135,7 +135,7 @@ public class ConsumeMessageCommandTest {
@Test
public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
- when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);