You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/05/19 03:41:29 UTC

[rocketmq] branch develop updated: Fix unit test stability

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1350c3f  Fix unit test stability
     new 5d21257  Merge pull request #2903 from ayanamist/fix-test
1350c3f is described below

commit 1350c3f37248ba0631a4d55c40de4ac3e433add0
Author: ayanamist <ay...@gmail.com>
AuthorDate: Tue May 18 15:47:23 2021 +0800

    Fix unit test stability
    
    Bump mockito-core to 3.10.0, remove powermock dependency, suppress useless logging
---
 .../consumer/DefaultLitePullConsumerTest.java      | 60 ++++++++++++------
 .../client/consumer/DefaultMQPushConsumerTest.java | 67 +++++++++++---------
 .../ConsumeMessageConcurrentlyServiceTest.java     | 42 +++++++------
 .../client/impl/factory/MQClientInstanceTest.java  |  4 +-
 .../DefaultMQConsumerWithOpenTracingTest.java      | 72 +++++++++++++---------
 .../trace/DefaultMQConsumerWithTraceTest.java      | 44 ++++++++-----
 client/src/test/resources/log4j2.xml               | 29 +++++++++
 .../OpenTracingTransactionProducer.java            |  2 +-
 .../apache/rocketmq/logging/inner/SysLogger.java   |  4 +-
 pom.xml                                            | 17 +----
 10 files changed, 211 insertions(+), 130 deletions(-)

diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5b7e635..04b760e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -20,8 +20,12 @@ package org.apache.rocketmq.client.consumer;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
-import java.util.*;
-
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -46,17 +50,15 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -70,9 +72,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultLitePullConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultLitePullConsumerTest {
     @Spy
     private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@@ -94,7 +94,10 @@ public class DefaultLitePullConsumerTest {
 
     @Before
     public void init() throws Exception {
-        PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        factoryTable.forEach((s, instance) -> instance.shutdown());
+        factoryTable.clear();
+
         Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
         field.setAccessible(true);
         RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
@@ -182,7 +185,9 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
-        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+        litePullConsumer.assign(messageQueues);
+        litePullConsumer.pause(messageQueues);
         long offset = litePullConsumer.committed(messageQueue);
         litePullConsumer.seek(messageQueue, offset);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
@@ -198,7 +203,9 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
-        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+        litePullConsumer.assign(messageQueues);
+        litePullConsumer.pause(messageQueues);
         litePullConsumer.seekToBegin(messageQueue);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
         field.setAccessible(true);
@@ -213,7 +220,9 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
         MessageQueue messageQueue = createMessageQueue();
-        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+        litePullConsumer.assign(messageQueues);
+        litePullConsumer.pause(messageQueues);
         litePullConsumer.seekToEnd(messageQueue);
         Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
         field.setAccessible(true);
@@ -228,7 +237,9 @@ public class DefaultLitePullConsumerTest {
         when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
         when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
         MessageQueue messageQueue = createMessageQueue();
-        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+        litePullConsumer.assign(messageQueues);
+        litePullConsumer.pause(messageQueues);
         try {
             litePullConsumer.seek(messageQueue, -1);
             failBecauseExceptionWasNotThrown(MQClientException.class);
@@ -517,9 +528,6 @@ public class DefaultLitePullConsumerTest {
     public void testConsumerAfterShutdown() throws Exception {
         DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer();
 
-        DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer);
-        when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>());
-
         new AsyncConsumer().executeAsync(defaultLitePullConsumer);
 
         Thread.sleep(100);
@@ -576,9 +584,9 @@ public class DefaultLitePullConsumerTest {
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
             anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
-            .thenAnswer(new Answer<Object>() {
+            .thenAnswer(new Answer<PullResult>() {
                 @Override
-                public Object answer(InvocationOnMock mock) throws Throwable {
+                public PullResult answer(InvocationOnMock mock) throws Throwable {
                     PullMessageRequestHeader requestHeader = mock.getArgument(1);
                     MessageClientExt messageClientExt = new MessageClientExt();
                     messageClientExt.setTopic(topic);
@@ -604,6 +612,7 @@ public class DefaultLitePullConsumerTest {
         DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
         litePullConsumer.subscribe(topic, "*");
+        suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
         litePullConsumer.start();
         initDefaultLitePullConsumer(litePullConsumer);
         return litePullConsumer;
@@ -612,6 +621,7 @@ public class DefaultLitePullConsumerTest {
     private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception {
         DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
         litePullConsumer.start();
         initDefaultLitePullConsumer(litePullConsumer);
         return litePullConsumer;
@@ -627,6 +637,7 @@ public class DefaultLitePullConsumerTest {
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
         litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
         litePullConsumer.subscribe(topic, "*");
+        suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
         litePullConsumer.start();
         initDefaultLitePullConsumer(litePullConsumer);
         return litePullConsumer;
@@ -648,4 +659,15 @@ public class DefaultLitePullConsumerTest {
         }
         return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
     }
+
+    private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
+        DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
+        if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+            litePullConsumer.changeInstanceNameToPID();
+        }
+        MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 924ee12..48d9b3a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -23,9 +23,12 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -37,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -45,13 +49,14 @@ import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
 import org.apache.rocketmq.client.impl.consumer.PullMessageService;
 import org.apache.rocketmq.client.impl.consumer.PullRequest;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
-import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Assert;
@@ -60,11 +65,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -77,9 +79,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerTest {
     private String consumerGroup;
     private String topic = "FooBar";
@@ -89,11 +89,15 @@ public class DefaultMQPushConsumerTest {
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
     private PullAPIWrapper pullAPIWrapper;
-    private RebalancePushImpl rebalancePushImpl;
+    private RebalanceImpl rebalanceImpl;
     private DefaultMQPushConsumer pushConsumer;
 
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        factoryTable.forEach((s, instance) -> instance.shutdown());
+        factoryTable.clear();
+
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -108,16 +112,21 @@ public class DefaultMQPushConsumerTest {
         });
 
         DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
-        PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
-        rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
+        rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl());
         Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
-        field.set(pushConsumerImpl, rebalancePushImpl);
+        field.set(pushConsumerImpl, rebalanceImpl);
 
         pushConsumer.subscribe(topic, "*");
         pushConsumer.start();
 
-        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
         field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
@@ -131,14 +140,13 @@ public class DefaultMQPushConsumerTest {
         field.setAccessible(true);
         field.set(pushConsumerImpl, pullAPIWrapper);
 
-        pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
         mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
             anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
-            .thenAnswer(new Answer<Object>() {
+            .thenAnswer(new Answer<PullResult>() {
                 @Override
-                public Object answer(InvocationOnMock mock) throws Throwable {
+                public PullResult answer(InvocationOnMock mock) throws Throwable {
                     PullMessageRequestHeader requestHeader = mock.getArgument(1);
                     MessageClientExt messageClientExt = new MessageClientExt();
                     messageClientExt.setTopic(topic);
@@ -155,11 +163,10 @@ public class DefaultMQPushConsumerTest {
             });
 
         doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
-        doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
         Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
         messageQueueSet.add(createPullRequest().getMessageQueue());
         pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
-        doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
+        doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
     }
 
     @After
@@ -175,12 +182,12 @@ public class DefaultMQPushConsumerTest {
     @Test
     public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final MessageExt[] messageExts = new MessageExt[1];
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                 ConsumeConcurrentlyContext context) {
-                messageExts[0] = msgs.get(0);
+                messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return null;
             }
@@ -188,20 +195,22 @@ public class DefaultMQPushConsumerTest {
 
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
-        countDownLatch.await();
-        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        MessageExt msg = messageAtomic.get();
+        assertThat(msg).isNotNull();
+        assertThat(msg.getTopic()).isEqualTo(topic);
+        assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
     }
 
     @Test
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final MessageExt[] messageExts = new MessageExt[1];
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
 
         MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
             @Override
             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-                messageExts[0] = msgs.get(0);
+                messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return null;
             }
@@ -214,8 +223,10 @@ public class DefaultMQPushConsumerTest {
         pullMessageService.executePullRequestLater(createPullRequest(), 100);
 
         countDownLatch.await(10, TimeUnit.SECONDS);
-        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        MessageExt msg = messageAtomic.get();
+        assertThat(msg).isNotNull();
+        assertThat(msg.getTopic()).isEqualTo(topic);
+        assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
     }
 
     @Test
@@ -281,7 +292,7 @@ public class DefaultMQPushConsumerTest {
 
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
-        countDownLatch.await();
+        assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue();
 
         pushConsumer.shutdown();
         assertThat(messageConsumedFlag.get()).isTrue();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 921743c..e8feb80 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -16,7 +16,19 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import org.apache.rocketmq.client.consumer.*;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -40,23 +52,15 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -117,9 +121,9 @@ public class ConsumeMessageConcurrentlyServiceTest {
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
                 anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
-                .thenAnswer(new Answer<Object>() {
+                .thenAnswer(new Answer<PullResult>() {
                     @Override
-                    public Object answer(InvocationOnMock mock) throws Throwable {
+                    public PullResult answer(InvocationOnMock mock) throws Throwable {
                         PullMessageRequestHeader requestHeader = mock.getArgument(1);
                         MessageClientExt messageClientExt = new MessageClientExt();
                         messageClientExt.setTopic(topic);
@@ -145,13 +149,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
     @Test
     public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final MessageExt[] messageExts = new MessageExt[1];
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
 
         ConsumeMessageConcurrentlyService  normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                             ConsumeConcurrentlyContext context) {
-                messageExts[0] = msgs.get(0);
+                messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
@@ -175,8 +179,10 @@ public class ConsumeMessageConcurrentlyServiceTest {
         StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
 
         assertThat(item.getValue().get()).isGreaterThan(0L);
-        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        MessageExt msg = messageAtomic.get();
+        assertThat(msg).isNotNull();
+        assertThat(msg.getTopic()).isEqualTo(topic);
+        assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
     }
 
     @After
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index d8f1e18..d37844c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -39,7 +40,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.internal.util.reflection.FieldSetter;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -55,7 +55,7 @@ public class MQClientInstanceTest {
 
     @Before
     public void init() throws Exception {
-        FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable);
+        FieldUtils.writeDeclaredField(mqClientInstance, "brokerAddrTable", brokerAddrTable, true);
     }
 
     @Test
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index 16a3d02..c173b8e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -20,6 +20,18 @@ package org.apache.rocketmq.client.trace;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.tag.Tags;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -27,11 +39,14 @@ import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
@@ -47,28 +62,17 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
@@ -80,9 +84,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMQConsumerWithOpenTracingTest {
     private String consumerGroup;
 
@@ -99,6 +101,10 @@ public class DefaultMQConsumerWithOpenTracingTest {
 
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        factoryTable.forEach((s, instance) -> instance.shutdown());
+        factoryTable.clear();
+
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
@@ -106,6 +112,10 @@ public class DefaultMQConsumerWithOpenTracingTest {
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
 
+        OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
+        Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L);
+        pushConsumer.setOffsetStore(offsetStore);
+
         pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@@ -114,8 +124,14 @@ public class DefaultMQConsumerWithOpenTracingTest {
             }
         });
 
-        PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
         DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
         rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
         Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
@@ -124,8 +140,6 @@ public class DefaultMQConsumerWithOpenTracingTest {
 
         pushConsumer.start();
 
-        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
-
         field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
@@ -142,11 +156,11 @@ public class DefaultMQConsumerWithOpenTracingTest {
         pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
         mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
 
-        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+        when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
                 anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
-                .thenAnswer(new Answer<Object>() {
+                .thenAnswer(new Answer<PullResult>() {
                     @Override
-                    public Object answer(InvocationOnMock mock) throws Throwable {
+                    public PullResult answer(InvocationOnMock mock) throws Throwable {
                         PullMessageRequestHeader requestHeader = mock.getArgument(1);
                         MessageClientExt messageClientExt = new MessageClientExt();
                         messageClientExt.setTopic(topic);
@@ -176,12 +190,12 @@ public class DefaultMQConsumerWithOpenTracingTest {
     @Test
     public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final MessageExt[] messageExts = new MessageExt[1];
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                             ConsumeConcurrentlyContext context) {
-                messageExts[0] = msgs.get(0);
+                messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
@@ -189,9 +203,11 @@ public class DefaultMQConsumerWithOpenTracingTest {
 
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
-        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'});
+        countDownLatch.await(30, TimeUnit.SECONDS);
+        MessageExt msg = messageAtomic.get();
+        assertThat(msg).isNotNull();
+        assertThat(msg.getTopic()).isEqualTo(topic);
+        assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
 
         assertThat(tracer.finishedSpans().size()).isEqualTo(1);
         MockSpan span = tracer.finishedSpans().get(0);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index c54c0bb..aec7d2c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -40,6 +43,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
@@ -53,17 +57,16 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
 import org.junit.Before;
@@ -71,11 +74,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
@@ -87,9 +87,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
 public class DefaultMQConsumerWithTraceTest {
     private String consumerGroup;
     private String consumerGroupNormal;
@@ -116,6 +114,10 @@ public class DefaultMQConsumerWithTraceTest {
 
     @Before
     public void init() throws Exception {
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        factoryTable.forEach((s, instance) -> instance.shutdown());
+        factoryTable.clear();
+
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, "");
         consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
@@ -135,8 +137,14 @@ public class DefaultMQConsumerWithTraceTest {
             }
         });
 
-        PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
         DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+
+        // suppress updateTopicRouteInfoFromNameServer
+        pushConsumer.changeInstanceNameToPID();
+        mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+        factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+        doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
         rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
         Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
@@ -174,9 +182,9 @@ public class DefaultMQConsumerWithTraceTest {
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
             anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
-            .thenAnswer(new Answer<Object>() {
+            .thenAnswer(new Answer<PullResult>() {
                 @Override
-                public Object answer(InvocationOnMock mock) throws Throwable {
+                public PullResult answer(InvocationOnMock mock) throws Throwable {
                     PullMessageRequestHeader requestHeader = mock.getArgument(1);
                     MessageClientExt messageClientExt = new MessageClientExt();
                     messageClientExt.setTopic(topic);
@@ -208,12 +216,12 @@ public class DefaultMQConsumerWithTraceTest {
         traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final MessageExt[] messageExts = new MessageExt[1];
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                 ConsumeConcurrentlyContext context) {
-                messageExts[0] = msgs.get(0);
+                messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return null;
             }
@@ -221,9 +229,11 @@ public class DefaultMQConsumerWithTraceTest {
 
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
-        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
-        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+        countDownLatch.await(30, TimeUnit.SECONDS);
+        MessageExt msg = messageAtomic.get();
+        assertThat(msg).isNotNull();
+        assertThat(msg.getTopic()).isEqualTo(topic);
+        assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
     }
 
     private PullRequest createPullRequest() {
diff --git a/client/src/test/resources/log4j2.xml b/client/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..52cf2a8
--- /dev/null
+++ b/client/src/test/resources/log4j2.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="ERROR">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
index 428640a..514f3ce 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -57,7 +57,7 @@ public class OpenTracingTransactionProducer {
 
         try {
             Message msg = new Message("TopicTest", "Tag", "KEY",
-                    ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult sendResult = producer.sendMessageInTransaction(msg, null);
             System.out.printf("%s%n", sendResult);
         } catch (MQClientException | UnsupportedEncodingException e) {
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
index b6d1049..aaba4d6 100755
--- a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
@@ -33,13 +33,13 @@ public class SysLogger {
 
     public static void debug(String msg) {
         if (debugEnabled && !quietMode) {
-            System.out.printf("%s", PREFIX + msg);
+            System.err.println(PREFIX + msg);
         }
     }
 
     public static void debug(String msg, Throwable t) {
         if (debugEnabled && !quietMode) {
-            System.out.printf("%s", PREFIX + msg);
+            System.err.println(PREFIX + msg);
             if (t != null) {
                 t.printStackTrace(System.out);
             }
diff --git a/pom.xml b/pom.xml
index 05878e5..3714eb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@
         <!-- Exclude all generated code -->
         <sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
         <sonar.exclusions>file:**/generated-sources/**,**/test/**</sonar.exclusions>
-        <powermock.version>2.0.2</powermock.version>
 
     </properties>
 
@@ -425,7 +424,7 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.11</version>
+            <version>4.13.2</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -437,19 +436,7 @@
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
-            <version>2.23.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-mockito2</artifactId>
-            <version>${powermock.version}</version>
+            <version>3.10.0</version>
             <scope>test</scope>
         </dependency>
     </dependencies>