You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by pi...@apache.org on 2022/05/16 02:07:21 UTC
[rocketmq-mqtt] branch main updated: fix #97 and add unit test
This is an automated email from the ASF dual-hosted git repository.
pingww pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 8e27eb2 fix #97 and add unit test
new 4002cff Merge pull request #98 from YxAc/fix_wrong_judg_toLmqMessage
8e27eb2 is described below
commit 8e27eb2601ac9cde3e06a0f8d49fd78f199cefa6
Author: AhaThinking <ah...@AhaThinkingdeMacBook-Pro.local>
AuthorDate: Sun May 15 21:18:15 2022 +0800
fix #97 and add unit test
---
.../mqtt/ds/store/LmqQueueStoreManager.java | 4 +-
.../test/{ => store}/TestLmqQueueStoreManager.java | 98 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 5 deletions(-)
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 1d7624a..ce83dae 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -128,9 +128,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
message.setOffset(parseLmqOffset(queue, mqMessage));
if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
- } else if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+ } else if (StringUtils.isNotBlank(mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// maybe from rmq
- String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String s = mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (String lmq : lmqSet) {
if (TopicUtils.isWildCard(lmq)) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
similarity index 50%
rename from mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
rename to mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
index c8e8cd5..a82c593 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
@@ -17,10 +17,14 @@
*
*/
-package org.apache.rocketmq.mqtt.ds.test;
+package org.apache.rocketmq.mqtt.ds.test.store;
+import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -32,7 +36,10 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.Message;
+import org.apache.rocketmq.mqtt.common.model.MessageEvent;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
@@ -47,14 +54,26 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -93,8 +112,13 @@ public class TestLmqQueueStoreManager {
Set<String> queues = new HashSet<>(Arrays.asList("test"));
Message message = new Message();
message.setOriginTopic("test");
+ message.putUserProperty(Message.extPropertyQoS, "1");
+ message.putUserProperty(Message.extPropertyCleanSessionFlag, "test");
+ message.putUserProperty(Message.extPropertyClientId, "clientId");
+
lmqQueueStoreManager.putMessage(queues, message);
- ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(org.apache.rocketmq.common.message.Message.class);
+ ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(
+ org.apache.rocketmq.common.message.Message.class);
verify(defaultMQProducer).send(argumentCaptor.capture(), any(SendCallback.class));
Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH));
}
@@ -109,11 +133,79 @@ public class TestLmqQueueStoreManager {
when(rebalanceImpl.getmQClientFactory()).thenReturn(mqClientInstance);
MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
- when(mqClientInstance.findBrokerAddressInSubscribe(any(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("test", false));
+ when(mqClientInstance.findBrokerAddressInSubscribe(any(), anyLong(), anyBoolean())).thenReturn(
+ new FindBrokerResult("test", false));
lmqQueueStoreManager.pullMessage("test", new Queue(), new QueueOffset(), 1);
verify(mqClientAPI).pullMessage(any(), any(), anyLong(), any(), any());
}
+ @Test
+ public void testToLmqPullRequest() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
+ Queue queue = new Queue(0, "t/t1/", "localhost");
+ MessageExt mqMessage = new MessageExt();
+ mqMessage.setMsgId("testToLmq");
+ mqMessage.setTopic("t/t1/");
+ mqMessage.setBody(JSON.toJSONString(Collections.singletonList(new MessageEvent())).getBytes(StandardCharsets.UTF_8));
+ Properties properties = new Properties();
+ properties.setProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%t%t1%");
+ properties.setProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "666");
+ properties.setProperty(Constants.PROPERTY_MQTT_RETRY_TIMES, "3");
+ properties.setProperty(Constants.PROPERTY_MQTT_EXT_DATA, JSON.toJSONString(new HashMap<>()));
+ FieldUtils.writeField(mqMessage, "properties", properties, true);
+
+ PullResult pullResult = new org.apache.rocketmq.client.consumer.PullResult(PullStatus.OFFSET_ILLEGAL,
+ 5, 0, 100, Collections.singletonList(mqMessage));
+
+ Object pullResultObj = MethodUtils.invokeMethod(lmqQueueStoreManager, true, "toLmqPullResult", queue, pullResult);
+ org.apache.rocketmq.mqtt.common.model.PullResult mqttPullResult =
+ (org.apache.rocketmq.mqtt.common.model.PullResult) pullResultObj;
+ Message message = mqttPullResult.getMessageList().iterator().next();
+
+ Assert.assertEquals("testToLmq", message.getMsgId());
+ Assert.assertEquals("t/t1/", message.getOriginTopic());
+ Assert.assertEquals("t/t1/", message.getFirstTopic());
+ Assert.assertEquals(3, message.getRetry());
+ Assert.assertEquals(666, message.getOffset());
+ }
+
+ @Test
+ public void testPullLastMessages() {
+ final long pullCount = 100, maxOffset = 5;
+ Queue queue = new Queue(0, "t/t1/", "localhost");
+ LmqQueueStoreManager spyLmqQueueStoreManager = spy(lmqQueueStoreManager);
+ CompletableFuture<Long> maxOffsetFuture = new CompletableFuture<>();
+ maxOffsetFuture.complete(maxOffset);
+ doReturn(maxOffsetFuture).when(spyLmqQueueStoreManager).queryQueueMaxOffset(queue);
+
+ QueueOffset verifyOffset = new QueueOffset();
+ verifyOffset.setOffset(0);
+ doReturn(null).when(spyLmqQueueStoreManager).pullMessage(eq("test"), eq(queue), eq(verifyOffset), eq(pullCount));
+
+ spyLmqQueueStoreManager.pullLastMessages("test", queue, pullCount);
+ verify(spyLmqQueueStoreManager).pullMessage(eq("test"), eq(queue), eq(verifyOffset), eq(pullCount));
+ }
+
+ @Test
+ public void testQueryQueueMaxOffset() throws Exception {
+ Queue queue = new Queue(0, "t/t1/", "localhost");
+ final long maxOffset = 5;
+ DefaultMQPullConsumerImpl defaultMQPullConsumerImpl = mock(DefaultMQPullConsumerImpl.class);
+ when(defaultMQPullConsumer.getDefaultMQPullConsumerImpl()).thenReturn(defaultMQPullConsumerImpl);
+ RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+ when(defaultMQPullConsumerImpl.getRebalanceImpl()).thenReturn(rebalanceImpl);
+ MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+ when(rebalanceImpl.getmQClientFactory()).thenReturn(mqClientInstance);
+ MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
+ when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+ when(mqClientInstance.findBrokerAddressInPublish(anyString())).thenReturn(
+ String.valueOf(new FindBrokerResult("test", false)));
+ when(mqClientAPI.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(maxOffset);
+
+ CompletableFuture<Long> queryOffsetFuture = lmqQueueStoreManager.queryQueueMaxOffset(queue);
+ verify(mqClientInstance, times(0)).updateTopicRouteInfoFromNameServer(any());
+ Assert.assertEquals(maxOffset, queryOffsetFuture.get().longValue());
+ }
+
}