You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 08:41:50 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix unit test

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new d4a656c  Fix unit test
d4a656c is described below

commit d4a656c18c30f85653079da3984dcef0c9ae4d4b
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 16:41:31 2021 +0800

    Fix unit test
---
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |  2 +-
 .../apache/rocketmq/broker/BrokerStartupTest.java  | 20 +++++-
 .../broker/processor/AdminBrokerProcessorTest.java |  4 +-
 .../broker/topic/TopicConfigManagerTest.java       | 76 ----------------------
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 12 ----
 .../client/impl/factory/MQClientInstance.java      |  3 +-
 .../store/RemoteBrokerOffsetStoreTest.java         | 10 ++-
 .../protocol/header/GetMaxOffsetRequestHeader.java |  8 ---
 .../apache/rocketmq/common/ConfigManagerTest.java  |  5 +-
 .../remoting/protocol/RemotingCommandTest.java     | 10 ++-
 .../tools/admin/DefaultMQAdminExtTest.java         | 43 +++++-------
 .../command/message/ConsumeMessageCommandTest.java |  4 +-
 12 files changed, 57 insertions(+), 140 deletions(-)

diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 339ed11..daf771a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -84,7 +84,7 @@ public class BrokerOuterAPITest {
     private BrokerOuterAPI brokerOuterAPI;
 
     public void init() throws Exception {
-        brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+        brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), brokerController);
         Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
         field.setAccessible(true);
         field.set(brokerOuterAPI, nettyRemotingClient);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
index c8da08d..ce370a3 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.broker;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Properties;
+import org.apache.rocketmq.common.MixAll;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -34,8 +35,21 @@ public class BrokerStartupTest {
         Class<BrokerStartup> clazz = BrokerStartup.class;
         Method method = clazz.getDeclaredMethod("properties2SystemEnv", Properties.class);
         method.setAccessible(true);
-        System.setProperty("rocketmq.namesrv.domain", "value");
-        method.invoke(null, properties);
-        Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
+        {
+            properties.put("rmqAddressServerDomain", "value1");
+            properties.put("rmqAddressServerSubGroup", "value2");
+            method.invoke(null, properties);
+            Assert.assertEquals("value1", System.getProperty("rocketmq.namesrv.domain"));
+            Assert.assertEquals("value2", System.getProperty("rocketmq.namesrv.domain.subgroup"));
+        }
+        {
+            properties.put("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
+            properties.put("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
+            method.invoke(null, properties);
+            Assert.assertEquals(MixAll.WS_DOMAIN_NAME, System.getProperty("rocketmq.namesrv.domain"));
+            Assert.assertEquals(MixAll.WS_DOMAIN_SUBGROUP, System.getProperty("rocketmq.namesrv.domain.subgroup"));
+        }
+
+
     }
 }
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 3770dae..2141f2c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -101,7 +101,7 @@ public class AdminBrokerProcessorTest {
     public void init() throws Exception {
         brokerController.setMessageStore(messageStore);
 
-        doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
+        //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
 
         adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
 
@@ -203,7 +203,7 @@ public class AdminBrokerProcessorTest {
             RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
             request.makeCustomHeaderToNet();
             RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
-            assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
             assertThat(response.getRemark()).contains("No topic in this broker.");
         }
     }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
deleted file mode 100644
index 4a60437..0000000
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.topic;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TopicConfigManagerTest {
-    @Mock
-    private DefaultMessageStore messageStore;
-    @Mock
-    private BrokerController brokerController;
-
-    private TopicConfigManager topicConfigManager;
-
-    private static final String topic = "FooBar";
-    private static final String broker1Name = "broker1";
-    private static final String broker1Addr = "127.0.0.1:12345";
-    private static final int queueId1 = 1;
-    private static final String broker2Name = "broker2";
-    private static final String broker2Addr = "127.0.0.2:12345";
-    private static final int queueId2 = 2;
-
-    @Before
-    public void before() {
-        BrokerConfig brokerConfig = new BrokerConfig();
-        brokerConfig.setBrokerName(broker1Name);
-        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-
-        when(brokerController.getMessageStore()).thenReturn(messageStore);
-
-        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
-        messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir"));
-        when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
-
-        this.topicConfigManager = new TopicConfigManager(brokerController);
-        this.topicConfigManager.getTopicConfigTable().put(topic, new TopicConfig(topic));
-    }
-
-    @After
-    public void after() throws Exception {
-        if (topicConfigManager != null) {
-            Files.deleteIfExists(Paths.get(topicConfigManager.configFilePath()));
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 0c15cff..8f0138d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1109,26 +1109,14 @@ public class MQClientAPIImpl {
 
     public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
-        return getMaxOffset(addr, topic, queueId, true, false, timeoutMillis);
-    }
-
-    public long getMaxOffset(final String addr, final String topic, final int queueId, boolean committed,
-        boolean fromLogicalQueue,
-        final long timeoutMillis)
-        throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
-        requestHeader.setCommitted(committed);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
             request, timeoutMillis);
         assert response != null;
-        HashMap<String, String> extFields = response.getExtFields();
-        if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) {
-            throw new MQRedirectException(response.getBody());
-        }
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMaxOffsetResponseHeader responseHeader =
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 793189e..0181951 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1020,7 +1020,8 @@ public class MQClientInstance {
 
 
     public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
-        if (topicEndPointsTable.get(mq.getTopic()) != null
+        if (topicEndPointsTable != null
+            && topicEndPointsTable.get(mq.getTopic()) != null
             && !topicEndPointsTable.get(mq.getTopic()).isEmpty()) {
             return topicEndPointsTable.get(mq.getTopic()).get(mq);
         }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index f762910..73cfefb 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -20,10 +20,12 @@ import java.util.Collections;
 import java.util.HashSet;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -60,6 +62,7 @@ public class RemoteBrokerOffsetStoreTest {
         when(mQClientFactory.getClientId()).thenReturn(clientId);
         when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
         when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+        when(mQClientFactory.getBrokerNameFromMessageQueue(any())).thenReturn(brokerName);
     }
 
     @Test
@@ -84,10 +87,15 @@ public class RemoteBrokerOffsetStoreTest {
 
         offsetStore.updateOffset(messageQueue, 1024, false);
 
-        doThrow(new MQBrokerException(-1, "", null))
+        doThrow(new OffsetNotFoundException(ResponseCode.PULL_NOT_FOUND, "", null))
             .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
         assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
 
+
+        doThrow(new MQBrokerException(-1, "", null))
+            .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
+
         doThrow(new RemotingException("", null))
             .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
         assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 2a577d7..e961af9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -29,7 +29,6 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
     private String topic;
     @CFNotNull
     private Integer queueId;
-    private boolean committed;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -55,11 +54,4 @@ public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
         this.queueId = queueId;
     }
 
-    public void setCommitted(boolean committed) {
-        this.committed = committed;
-    }
-
-    public boolean isCommitted() {
-        return committed;
-    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
index a884b6a..a61ec4c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/ConfigManagerTest.java
@@ -15,13 +15,10 @@ package org.apache.rocketmq.common;/*
  * limitations under the License.
  */
 
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.MixAll;
-import org.junit.Test;
-
 import java.io.File;
 import java.io.PrintWriter;
 import java.lang.reflect.Method;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index f2f6935..a0fc765 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -209,7 +209,11 @@ public class RemotingCommandTest {
         SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader();
         RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader);
         Field[] fields  = remotingCommand.getClazzFields(subExtFieldsHeader.getClass());
-        Assert.assertEquals(7, fields.length);
+        Set<String> fieldNames = new HashSet<>();
+        for (Field field: fields) {
+            fieldNames.add(field.getName());
+        }
+        Assert.assertTrue(fields.length >= 7);
         Set<String> names = new HashSet<>();
         names.add("stringValue");
         names.add("intValue");
@@ -218,8 +222,8 @@ public class RemotingCommandTest {
         names.add("doubleValue");
         names.add("name");
         names.add("value");
-        for (Field field : fields) {
-            Assert.assertTrue(names.contains(field.getName()));
+        for (String name: names) {
+            Assert.assertTrue(fieldNames.contains(name));
         }
         remotingCommand.makeCustomHeaderToNet();
         SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass());
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index deb3d05..4514ef4 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -16,6 +16,18 @@
  */
 package org.apache.rocketmq.tools.admin;
 
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,6 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -66,25 +79,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -248,7 +247,8 @@ public class DefaultMQAdminExtTest {
                 put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig"));
             }
         });
-        when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper);
+        //when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper);
+        when(mQClientAPIImpl.getTopicConfig(anyString(), anyString(), anyLong())).thenReturn(new TopicConfigAndQueueMapping(new TopicConfig("topic_test_examine_topicConfig"), null));
     }
 
     @AfterClass
@@ -439,7 +439,7 @@ public class DefaultMQAdminExtTest {
 
     @Test
     public void testMaxOffset() throws Exception {
-        when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(100L);
+        when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(100L);
 
         assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L);
     }
@@ -451,19 +451,8 @@ public class DefaultMQAdminExtTest {
         assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L);
     }
 
-    @Test
-    public void testMaxOffset_LogicalQueue() throws Exception {
-        when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L);
 
-        assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0))).isEqualTo(1010L);
-    }
 
-    @Test
-    public void testSearchOffset_LogicalQueue() throws Exception {
-        when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L);
-
-        assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX, 0), System.currentTimeMillis())).isEqualTo(1011L);
-    }
 
     @Test
     public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 1154395..25aa3f8 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -113,7 +113,7 @@ public class ConsumeMessageCommandTest {
     @Test
     public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
         DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
-        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
         Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
         producerField.setAccessible(true);
         producerField.set(consumeMessageCommand, defaultMQPullConsumer);
@@ -135,7 +135,7 @@ public class ConsumeMessageCommandTest {
     @Test
     public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
         DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
-        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
         Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
         producerField.setAccessible(true);
         producerField.set(consumeMessageCommand, defaultMQPullConsumer);