You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by pi...@apache.org on 2022/05/16 02:07:21 UTC

[rocketmq-mqtt] branch main updated: fix #97 and add unit test

This is an automated email from the ASF dual-hosted git repository.

pingww pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e27eb2  fix #97 and add unit test
     new 4002cff  Merge pull request #98 from YxAc/fix_wrong_judg_toLmqMessage
8e27eb2 is described below

commit 8e27eb2601ac9cde3e06a0f8d49fd78f199cefa6
Author: AhaThinking <ah...@AhaThinkingdeMacBook-Pro.local>
AuthorDate: Sun May 15 21:18:15 2022 +0800

    fix #97 and add unit test
---
 .../mqtt/ds/store/LmqQueueStoreManager.java        |  4 +-
 .../test/{ => store}/TestLmqQueueStoreManager.java | 98 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 5 deletions(-)

diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 1d7624a..ce83dae 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -128,9 +128,9 @@ public class LmqQueueStoreManager implements LmqQueueStore {
         message.setOffset(parseLmqOffset(queue, mqMessage));
         if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
             message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
-        } else if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+        } else if (StringUtils.isNotBlank(mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
             // maybe from rmq
-            String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+            String s = mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
             String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
             for (String lmq : lmqSet) {
                 if (TopicUtils.isWildCard(lmq)) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
similarity index 50%
rename from mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
rename to mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
index c8e8cd5..a82c593 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
@@ -17,10 +17,14 @@
  *
  */
 
-package org.apache.rocketmq.mqtt.ds.test;
+package org.apache.rocketmq.mqtt.ds.test.store;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -32,7 +36,10 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.Message;
+import org.apache.rocketmq.mqtt.common.model.MessageEvent;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
@@ -47,14 +54,26 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -93,8 +112,13 @@ public class TestLmqQueueStoreManager {
         Set<String> queues = new HashSet<>(Arrays.asList("test"));
         Message message = new Message();
         message.setOriginTopic("test");
+        message.putUserProperty(Message.extPropertyQoS, "1");
+        message.putUserProperty(Message.extPropertyCleanSessionFlag, "test");
+        message.putUserProperty(Message.extPropertyClientId, "clientId");
+
         lmqQueueStoreManager.putMessage(queues, message);
-        ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(org.apache.rocketmq.common.message.Message.class);
+        ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(
+                org.apache.rocketmq.common.message.Message.class);
         verify(defaultMQProducer).send(argumentCaptor.capture(), any(SendCallback.class));
         Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH));
     }
@@ -109,11 +133,79 @@ public class TestLmqQueueStoreManager {
         when(rebalanceImpl.getmQClientFactory()).thenReturn(mqClientInstance);
         MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
         when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
-        when(mqClientInstance.findBrokerAddressInSubscribe(any(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("test", false));
+        when(mqClientInstance.findBrokerAddressInSubscribe(any(), anyLong(), anyBoolean())).thenReturn(
+                new FindBrokerResult("test", false));
 
         lmqQueueStoreManager.pullMessage("test", new Queue(), new QueueOffset(), 1);
 
         verify(mqClientAPI).pullMessage(any(), any(), anyLong(), any(), any());
     }
 
+    @Test
+    public void testToLmqPullRequest() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
+        Queue queue = new Queue(0, "t/t1/", "localhost");
+        MessageExt mqMessage = new MessageExt();
+        mqMessage.setMsgId("testToLmq");
+        mqMessage.setTopic("t/t1/");
+        mqMessage.setBody(JSON.toJSONString(Collections.singletonList(new MessageEvent())).getBytes(StandardCharsets.UTF_8));
+        Properties properties = new Properties();
+        properties.setProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%t%t1%");
+        properties.setProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, "666");
+        properties.setProperty(Constants.PROPERTY_MQTT_RETRY_TIMES, "3");
+        properties.setProperty(Constants.PROPERTY_MQTT_EXT_DATA, JSON.toJSONString(new HashMap<>()));
+        FieldUtils.writeField(mqMessage, "properties", properties, true);
+
+        PullResult pullResult = new org.apache.rocketmq.client.consumer.PullResult(PullStatus.OFFSET_ILLEGAL,
+                5, 0, 100, Collections.singletonList(mqMessage));
+
+        Object pullResultObj = MethodUtils.invokeMethod(lmqQueueStoreManager, true, "toLmqPullResult", queue, pullResult);
+        org.apache.rocketmq.mqtt.common.model.PullResult mqttPullResult =
+                (org.apache.rocketmq.mqtt.common.model.PullResult) pullResultObj;
+        Message message = mqttPullResult.getMessageList().iterator().next();
+
+        Assert.assertEquals("testToLmq", message.getMsgId());
+        Assert.assertEquals("t/t1/", message.getOriginTopic());
+        Assert.assertEquals("t/t1/", message.getFirstTopic());
+        Assert.assertEquals(3, message.getRetry());
+        Assert.assertEquals(666, message.getOffset());
+    }
+
+    @Test
+    public void testPullLastMessages() {
+        final long pullCount = 100, maxOffset = 5;
+        Queue queue = new Queue(0, "t/t1/", "localhost");
+        LmqQueueStoreManager spyLmqQueueStoreManager = spy(lmqQueueStoreManager);
+        CompletableFuture<Long> maxOffsetFuture = new CompletableFuture<>();
+        maxOffsetFuture.complete(maxOffset);
+        doReturn(maxOffsetFuture).when(spyLmqQueueStoreManager).queryQueueMaxOffset(queue);
+
+        QueueOffset verifyOffset = new QueueOffset();
+        verifyOffset.setOffset(0);
+        doReturn(null).when(spyLmqQueueStoreManager).pullMessage(eq("test"), eq(queue), eq(verifyOffset), eq(pullCount));
+
+        spyLmqQueueStoreManager.pullLastMessages("test", queue, pullCount);
+        verify(spyLmqQueueStoreManager).pullMessage(eq("test"), eq(queue), eq(verifyOffset), eq(pullCount));
+    }
+
+    @Test
+    public void testQueryQueueMaxOffset() throws Exception {
+        Queue queue = new Queue(0, "t/t1/", "localhost");
+        final long maxOffset = 5;
+        DefaultMQPullConsumerImpl defaultMQPullConsumerImpl = mock(DefaultMQPullConsumerImpl.class);
+        when(defaultMQPullConsumer.getDefaultMQPullConsumerImpl()).thenReturn(defaultMQPullConsumerImpl);
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        when(defaultMQPullConsumerImpl.getRebalanceImpl()).thenReturn(rebalanceImpl);
+        MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+        when(rebalanceImpl.getmQClientFactory()).thenReturn(mqClientInstance);
+        MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class);
+        when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+        when(mqClientInstance.findBrokerAddressInPublish(anyString())).thenReturn(
+                String.valueOf(new FindBrokerResult("test", false)));
+        when(mqClientAPI.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(maxOffset);
+
+        CompletableFuture<Long> queryOffsetFuture = lmqQueueStoreManager.queryQueueMaxOffset(queue);
+        verify(mqClientInstance, times(0)).updateTopicRouteInfoFromNameServer(any());
+        Assert.assertEquals(maxOffset, queryOffsetFuture.get().longValue());
+    }
+
 }