You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/27 11:51:16 UTC
[rocketmq] branch develop updated: [RIP-10] Add test cases for
ConsumeMessageConcurrentlyService consume success status (#888)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fe23415 [RIP-10] Add test cases for ConsumeMessageConcurrentlyService consume success status (#888)
fe23415 is described below
commit fe234155f0f0b2d50de42c1ed64c717d5718f744
Author: 王启团 <ha...@139.com>
AuthorDate: Wed Feb 27 19:51:07 2019 +0800
[RIP-10] Add test cases for ConsumeMessageConcurrentlyService consume success status (#888)
[RIP-10] Add test cases for ConsumeMessageConcurrentlyService consume success status
---
.../ConsumeMessageConcurrentlyServiceTest.java | 211 +++++++++++++++++++++
1 file changed, 211 insertions(+)
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
new file mode 100644
index 0000000..15a261f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.*;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumeMessageConcurrentlyServiceTest {
+ private String consumerGroup;
+ private String topic = "FooBar";
+ private String brokerName = "BrokerA";
+ private MQClientInstance mQClientFactory;
+
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ private PullAPIWrapper pullAPIWrapper;
+ private RebalancePushImpl rebalancePushImpl;
+ private DefaultMQPushConsumer pushConsumer;
+
+ @Before
+ public void init() throws Exception {
+ consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+ pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+ pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+ pushConsumer.setPullInterval(60 * 1000);
+
+ pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+ rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+ Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, rebalancePushImpl);
+ pushConsumer.subscribe(topic, "*");
+ pushConsumer.start();
+
+ mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, mQClientFactory);
+
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, pullAPIWrapper);
+
+ pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+ mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
+
+ when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ PullMessageRequestHeader requestHeader = mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setQueueId(0);
+ messageClientExt.setMsgId("123");
+ messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+ ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+ return pullResult;
+ }
+ });
+
+ doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+ Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+ messageQueueSet.add(createPullRequest().getMessageQueue());
+ pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+ }
+
+ @Test
+ public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ final MessageExt[] messageExts = new MessageExt[1];
+
+ ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ messageExts[0] = msgs.get(0);
+ countDownLatch.countDown();
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
+
+ PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+ pullMessageService.executePullRequestImmediately(createPullRequest());
+ countDownLatch.await();
+
+ Thread.sleep(1000);
+
+ org.apache.rocketmq.common.protocol.body.ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic);
+
+ ConsumerStatsManager mgr = normalServie.getConsumerStatsManager();
+
+ Field statItmeSetField = mgr.getClass().getDeclaredField("topicAndGroupConsumeOKTPS");
+ statItmeSetField.setAccessible(true);
+
+ StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
+ StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
+
+ assertThat(item.getValue().get()).isGreaterThan(0L);
+ assertThat(messageExts[0].getTopic()).isEqualTo(topic);
+ assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ }
+
+ @After
+ public void terminate() {
+ pushConsumer.shutdown();
+ }
+
+ private PullRequest createPullRequest() {
+ PullRequest pullRequest = new PullRequest();
+ pullRequest.setConsumerGroup(consumerGroup);
+ pullRequest.setNextOffset(1024);
+
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName(brokerName);
+ messageQueue.setQueueId(0);
+ messageQueue.setTopic(topic);
+ pullRequest.setMessageQueue(messageQueue);
+ ProcessQueue processQueue = new ProcessQueue();
+ processQueue.setLocked(true);
+ processQueue.setLastLockTimestamp(System.currentTimeMillis());
+ pullRequest.setProcessQueue(processQueue);
+
+ return pullRequest;
+ }
+
+ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+ List<MessageExt> messageExtList) throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ for (MessageExt messageExt : messageExtList) {
+ outputStream.write(MessageDecoder.encode(messageExt, false));
+ }
+ return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+ }
+}