You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/05/19 03:41:29 UTC
[rocketmq] branch develop updated: Fix unit test stability
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1350c3f Fix unit test stability
new 5d21257 Merge pull request #2903 from ayanamist/fix-test
1350c3f is described below
commit 1350c3f37248ba0631a4d55c40de4ac3e433add0
Author: ayanamist <ay...@gmail.com>
AuthorDate: Tue May 18 15:47:23 2021 +0800
Fix unit test stability
Bump mockito-core to 3.10.0, remove powermock dependency, suppress useless logging
---
.../consumer/DefaultLitePullConsumerTest.java | 60 ++++++++++++------
.../client/consumer/DefaultMQPushConsumerTest.java | 67 +++++++++++---------
.../ConsumeMessageConcurrentlyServiceTest.java | 42 +++++++------
.../client/impl/factory/MQClientInstanceTest.java | 4 +-
.../DefaultMQConsumerWithOpenTracingTest.java | 72 +++++++++++++---------
.../trace/DefaultMQConsumerWithTraceTest.java | 44 ++++++++-----
client/src/test/resources/log4j2.xml | 29 +++++++++
.../OpenTracingTransactionProducer.java | 2 +-
.../apache/rocketmq/logging/inner/SysLogger.java | 4 +-
pom.xml | 17 +----
10 files changed, 211 insertions(+), 130 deletions(-)
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5b7e635..04b760e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -20,8 +20,12 @@ package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
-import java.util.*;
-
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -46,17 +50,15 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -70,9 +72,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultLitePullConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@@ -94,7 +94,10 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
- PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ factoryTable.forEach((s, instance) -> instance.shutdown());
+ factoryTable.clear();
+
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
@@ -182,7 +185,9 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+ litePullConsumer.assign(messageQueues);
+ litePullConsumer.pause(messageQueues);
long offset = litePullConsumer.committed(messageQueue);
litePullConsumer.seek(messageQueue, offset);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
@@ -198,7 +203,9 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+ litePullConsumer.assign(messageQueues);
+ litePullConsumer.pause(messageQueues);
litePullConsumer.seekToBegin(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
@@ -213,7 +220,9 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+ litePullConsumer.assign(messageQueues);
+ litePullConsumer.pause(messageQueues);
litePullConsumer.seekToEnd(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
@@ -228,7 +237,9 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List<MessageQueue> messageQueues = Collections.singletonList(messageQueue);
+ litePullConsumer.assign(messageQueues);
+ litePullConsumer.pause(messageQueues);
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
@@ -517,9 +528,6 @@ public class DefaultLitePullConsumerTest {
public void testConsumerAfterShutdown() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer();
- DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer);
- when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>());
-
new AsyncConsumer().executeAsync(defaultLitePullConsumer);
Thread.sleep(100);
@@ -576,9 +584,9 @@ public class DefaultLitePullConsumerTest {
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
- .thenAnswer(new Answer<Object>() {
+ .thenAnswer(new Answer<PullResult>() {
@Override
- public Object answer(InvocationOnMock mock) throws Throwable {
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
@@ -604,6 +612,7 @@ public class DefaultLitePullConsumerTest {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
@@ -612,6 +621,7 @@ public class DefaultLitePullConsumerTest {
private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
@@ -627,6 +637,7 @@ public class DefaultLitePullConsumerTest {
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
litePullConsumer.subscribe(topic, "*");
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
@@ -648,4 +659,15 @@ public class DefaultLitePullConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
+
+ private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
+ DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
+ if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+ litePullConsumer.changeInstanceNameToPID();
+ }
+ MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+ }
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 924ee12..48d9b3a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -23,9 +23,12 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -37,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -45,13 +49,14 @@ import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
-import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
@@ -60,11 +65,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -77,9 +79,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
@@ -89,11 +89,15 @@ public class DefaultMQPushConsumerTest {
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
- private RebalancePushImpl rebalancePushImpl;
+ private RebalanceImpl rebalanceImpl;
private DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ factoryTable.forEach((s, instance) -> instance.shutdown());
+ factoryTable.clear();
+
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -108,16 +112,21 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
- PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
- rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+
+ // suppress updateTopicRouteInfoFromNameServer
+ pushConsumer.changeInstanceNameToPID();
+ mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+ factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
+ rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl());
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
- field.set(pushConsumerImpl, rebalancePushImpl);
+ field.set(pushConsumerImpl, rebalanceImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
- mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
@@ -131,14 +140,13 @@ public class DefaultMQPushConsumerTest {
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
- pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
- .thenAnswer(new Answer<Object>() {
+ .thenAnswer(new Answer<PullResult>() {
@Override
- public Object answer(InvocationOnMock mock) throws Throwable {
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
@@ -155,11 +163,10 @@ public class DefaultMQPushConsumerTest {
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
- doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
- doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
+ doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
}
@After
@@ -175,12 +182,12 @@ public class DefaultMQPushConsumerTest {
@Test
public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final MessageExt[] messageExts = new MessageExt[1];
+ final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
- messageExts[0] = msgs.get(0);
+ messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return null;
}
@@ -188,20 +195,22 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
- countDownLatch.await();
- assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ MessageExt msg = messageAtomic.get();
+ assertThat(msg).isNotNull();
+ assertThat(msg.getTopic()).isEqualTo(topic);
+ assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final MessageExt[] messageExts = new MessageExt[1];
+ final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- messageExts[0] = msgs.get(0);
+ messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return null;
}
@@ -214,8 +223,10 @@ public class DefaultMQPushConsumerTest {
pullMessageService.executePullRequestLater(createPullRequest(), 100);
countDownLatch.await(10, TimeUnit.SECONDS);
- assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ MessageExt msg = messageAtomic.get();
+ assertThat(msg).isNotNull();
+ assertThat(msg.getTopic()).isEqualTo(topic);
+ assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
}
@Test
@@ -281,7 +292,7 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
- countDownLatch.await();
+ assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue();
pushConsumer.shutdown();
assertThat(messageConsumedFlag.get()).isTrue();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 921743c..e8feb80 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -16,7 +16,19 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import org.apache.rocketmq.client.consumer.*;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -40,23 +52,15 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -117,9 +121,9 @@ public class ConsumeMessageConcurrentlyServiceTest {
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
- .thenAnswer(new Answer<Object>() {
+ .thenAnswer(new Answer<PullResult>() {
@Override
- public Object answer(InvocationOnMock mock) throws Throwable {
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
@@ -145,13 +149,13 @@ public class ConsumeMessageConcurrentlyServiceTest {
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final MessageExt[] messageExts = new MessageExt[1];
+ final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
- messageExts[0] = msgs.get(0);
+ messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@@ -175,8 +179,10 @@ public class ConsumeMessageConcurrentlyServiceTest {
StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
assertThat(item.getValue().get()).isGreaterThan(0L);
- assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ MessageExt msg = messageAtomic.get();
+ assertThat(msg).isNotNull();
+ assertThat(msg.getTopic()).isEqualTo(topic);
+ assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
}
@After
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index d8f1e18..d37844c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -39,7 +40,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -55,7 +55,7 @@ public class MQClientInstanceTest {
@Before
public void init() throws Exception {
- FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable);
+ FieldUtils.writeDeclaredField(mqClientInstance, "brokerAddrTable", brokerAddrTable, true);
}
@Test
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index 16a3d02..c173b8e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -20,6 +20,18 @@ package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -27,11 +39,14 @@ import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
@@ -47,28 +62,17 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@@ -80,9 +84,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMQConsumerWithOpenTracingTest {
private String consumerGroup;
@@ -99,6 +101,10 @@ public class DefaultMQConsumerWithOpenTracingTest {
@Before
public void init() throws Exception {
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ factoryTable.forEach((s, instance) -> instance.shutdown());
+ factoryTable.clear();
+
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
@@ -106,6 +112,10 @@ public class DefaultMQConsumerWithOpenTracingTest {
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
+ OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
+ Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L);
+ pushConsumer.setOffsetStore(offsetStore);
+
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@@ -114,8 +124,14 @@ public class DefaultMQConsumerWithOpenTracingTest {
}
});
- PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+
+ // suppress updateTopicRouteInfoFromNameServer
+ pushConsumer.changeInstanceNameToPID();
+ mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+ factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
@@ -124,8 +140,6 @@ public class DefaultMQConsumerWithOpenTracingTest {
pushConsumer.start();
- mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
-
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
@@ -142,11 +156,11 @@ public class DefaultMQConsumerWithOpenTracingTest {
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
- when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
- .thenAnswer(new Answer<Object>() {
+ .thenAnswer(new Answer<PullResult>() {
@Override
- public Object answer(InvocationOnMock mock) throws Throwable {
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
@@ -176,12 +190,12 @@ public class DefaultMQConsumerWithOpenTracingTest {
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final MessageExt[] messageExts = new MessageExt[1];
+ final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
- messageExts[0] = msgs.get(0);
+ messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@@ -189,9 +203,11 @@ public class DefaultMQConsumerWithOpenTracingTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
- assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'});
+ countDownLatch.await(30, TimeUnit.SECONDS);
+ MessageExt msg = messageAtomic.get();
+ assertThat(msg).isNotNull();
+ assertThat(msg.getTopic()).isEqualTo(topic);
+ assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
MockSpan span = tracer.finishedSpans().get(0);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index c54c0bb..aec7d2c 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -26,8 +26,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -40,6 +43,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
@@ -53,17 +57,16 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
@@ -71,11 +74,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@@ -87,9 +87,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
@@ -116,6 +114,10 @@ public class DefaultMQConsumerWithTraceTest {
@Before
public void init() throws Exception {
+ ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+ factoryTable.forEach((s, instance) -> instance.shutdown());
+ factoryTable.clear();
+
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, "");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
@@ -135,8 +137,14 @@ public class DefaultMQConsumerWithTraceTest {
}
});
- PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+
+ // suppress updateTopicRouteInfoFromNameServer
+ pushConsumer.changeInstanceNameToPID();
+ mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
+ factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
+ doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
+
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
@@ -174,9 +182,9 @@ public class DefaultMQConsumerWithTraceTest {
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
- .thenAnswer(new Answer<Object>() {
+ .thenAnswer(new Answer<PullResult>() {
@Override
- public Object answer(InvocationOnMock mock) throws Throwable {
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
@@ -208,12 +216,12 @@ public class DefaultMQConsumerWithTraceTest {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
final CountDownLatch countDownLatch = new CountDownLatch(1);
- final MessageExt[] messageExts = new MessageExt[1];
+ final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
- messageExts[0] = msgs.get(0);
+ messageAtomic.set(msgs.get(0));
countDownLatch.countDown();
return null;
}
@@ -221,9 +229,11 @@ public class DefaultMQConsumerWithTraceTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
- assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ countDownLatch.await(30, TimeUnit.SECONDS);
+ MessageExt msg = messageAtomic.get();
+ assertThat(msg).isNotNull();
+ assertThat(msg.getTopic()).isEqualTo(topic);
+ assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
}
private PullRequest createPullRequest() {
diff --git a/client/src/test/resources/log4j2.xml b/client/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..52cf2a8
--- /dev/null
+++ b/client/src/test/resources/log4j2.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="ERROR">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
index 428640a..514f3ce 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -57,7 +57,7 @@ public class OpenTracingTransactionProducer {
try {
Message msg = new Message("TopicTest", "Tag", "KEY",
- ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
+ "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
index b6d1049..aaba4d6 100755
--- a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
@@ -33,13 +33,13 @@ public class SysLogger {
public static void debug(String msg) {
if (debugEnabled && !quietMode) {
- System.out.printf("%s", PREFIX + msg);
+ System.err.println(PREFIX + msg);
}
}
public static void debug(String msg, Throwable t) {
if (debugEnabled && !quietMode) {
- System.out.printf("%s", PREFIX + msg);
+ System.err.println(PREFIX + msg);
if (t != null) {
t.printStackTrace(System.out);
}
diff --git a/pom.xml b/pom.xml
index 05878e5..3714eb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
<sonar.exclusions>file:**/generated-sources/**,**/test/**</sonar.exclusions>
- <powermock.version>2.0.2</powermock.version>
</properties>
@@ -425,7 +424,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -437,19 +436,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>2.23.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <version>${powermock.version}</version>
+ <version>3.10.0</version>
<scope>test</scope>
</dependency>
</dependencies>