You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:31:45 UTC

[rocketmq-mqtt] 17/43: fix the multi level topic special char

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit 531f7a91d1dd52d114099e25ff8fd713762556f3
Author: tianliuliu <64...@qq.com>
AuthorDate: Thu Mar 10 20:02:57 2022 +0800

    fix the multi level topic special char
---
 .../rocketmq/mqtt/common/facade/LmqQueueStore.java |  4 ---
 .../rocketmq/mqtt/ds/notify/NotifyManager.java     | 13 +++++-----
 .../mqtt/ds/store/LmqOffsetStoreManager.java       |  9 +++----
 .../mqtt/ds/store/LmqQueueStoreManager.java        | 29 +++++++++++-----------
 .../mqtt/ds/test/TestLmqQueueStoreManager.java     |  4 +--
 .../rocketmq/mqtt/example/RocketMQProducer.java    |  9 ++++---
 6 files changed, 33 insertions(+), 35 deletions(-)

diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index 5da4d7d..b393abe 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -27,10 +27,6 @@ import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.StoreResult;
 
 public interface LmqQueueStore {
-    String LMQ_PREFIX = "%LMQ%";
-    String PROPERTY_INNER_MULTI_DISPATCH = "INNER_MULTI_DISPATCH";
-    String PROPERTY_INNER_MULTI_QUEUE_OFFSET = "INNER_MULTI_QUEUE_OFFSET";
-    String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
 
     /**
      * put message and atomic dispatch to multiple queues
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 029cbb8..69d7991 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -29,8 +29,8 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
 import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.MessageEvent;
@@ -195,18 +195,19 @@ public class NotifyManager {
             messageEvent.setPubTopic(message.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
             return;
         }
-        if (StringUtils.isNotBlank(message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH))) {
+        if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
             // maybe from rmq
-            String s = message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH);
-            String[] lmqSet = s.split(LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER);
+            String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+            String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
             for (String lmq : lmqSet) {
                 if (TopicUtils.isWildCard(lmq)) {
                     continue;
                 }
-                if (!lmq.contains(LmqQueueStore.LMQ_PREFIX)) {
+                if (!lmq.contains(MixAll.LMQ_PREFIX)) {
                     continue;
                 }
-                messageEvent.setPubTopic(lmq.replace(LmqQueueStore.LMQ_PREFIX, ""));
+                String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
+                messageEvent.setPubTopic(StringUtils.replace(originQueue, "%","/"));
             }
         }
     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
index 3651945..fd8d718 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
@@ -26,7 +26,6 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -81,8 +80,8 @@ public class LmqOffsetStoreManager implements LmqOffsetStore {
                     String brokerAddress = tmpBrokerAddressMap.get(queue.getBrokerName());
                     QueueOffset queueOffset = each.getValue();
                     UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
-                    updateHeader.setTopic(queue.getQueueName());
-                    updateHeader.setConsumerGroup(LmqQueueStore.LMQ_PREFIX + clientId);
+                    updateHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+                    updateHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
                     updateHeader.setQueueId((int) queue.getQueueId());
                     updateHeader.setCommitOffset(queueOffset.getOffset());
                     defaultMQPullConsumer
@@ -112,8 +111,8 @@ public class LmqOffsetStoreManager implements LmqOffsetStore {
                 map.put(queue, queueOffset);
                 try {
                     QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader();
-                    queryHeader.setTopic(queue.getQueueName());
-                    queryHeader.setConsumerGroup(LmqQueueStore.LMQ_PREFIX + clientId);
+                    queryHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+                    queryHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
                     queryHeader.setQueueId((int) queue.getQueueId());
                     long offset = defaultMQPullConsumer
                             .getDefaultMQPullConsumerImpl()
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index c7a2b00..88b42aa 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -126,15 +126,16 @@ public class LmqQueueStoreManager implements LmqQueueStore {
         message.setOffset(parseLmqOffset(queue, mqMessage));
         if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
             message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
-        } else if (StringUtils.isNotBlank(message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH))) {
+        } else if (StringUtils.isNotBlank(message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
             // maybe from rmq
-            String s = message.getUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH);
-            String[] lmqSet = s.split(LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER);
+            String s = message.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+            String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
             for (String lmq : lmqSet) {
                 if (TopicUtils.isWildCard(lmq)) {
                     continue;
                 }
-                message.setOriginTopic(lmq.replace(LmqQueueStore.LMQ_PREFIX, ""));
+                String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
+                message.setOriginTopic(StringUtils.replace(originQueue, "%","/"));
             }
         }
         message.setFirstTopic(mqMessage.getTopic());
@@ -154,18 +155,18 @@ public class LmqQueueStoreManager implements LmqQueueStore {
     }
 
     private long parseLmqOffset(Queue queue, MessageExt mqMessage) {
-        String multiDispatchQueue = mqMessage.getProperty(PROPERTY_INNER_MULTI_DISPATCH);
+        String multiDispatchQueue = mqMessage.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
         if (StringUtils.isBlank(multiDispatchQueue)) {
             return mqMessage.getQueueOffset();
         }
-        String multiQueueOffset = mqMessage.getProperty(PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+        String multiQueueOffset = mqMessage.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
         if (StringUtils.isBlank(multiQueueOffset)) {
             return mqMessage.getQueueOffset();
         }
-        String[] queues = multiDispatchQueue.split(MULTI_DISPATCH_QUEUE_SPLITTER);
-        String[] queueOffsets = multiQueueOffset.split(MULTI_DISPATCH_QUEUE_SPLITTER);
+        String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
         for (int i = 0; i < queues.length; i++) {
-            if ((LMQ_PREFIX + queue.getQueueName()).equals(queues[i])) {
+            if ((MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%")).equals(queues[i])) {
                 return Long.parseLong(queueOffsets[i]);
             }
         }
@@ -177,10 +178,10 @@ public class LmqQueueStoreManager implements LmqQueueStore {
         CompletableFuture<StoreResult> result = new CompletableFuture<>();
         org.apache.rocketmq.common.message.Message mqMessage = toMQMessage(message);
         mqMessage.setTags(Constants.MQTT_TAG);
-        mqMessage.putUserProperty(PROPERTY_INNER_MULTI_DISPATCH,
+        mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                 StringUtils.join(
-                        queues.stream().map(s -> LMQ_PREFIX + s).collect(Collectors.toSet()),
-                        MULTI_DISPATCH_QUEUE_SPLITTER));
+                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+                        MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
         try {
             long start = System.currentTimeMillis();
             defaultMQProducer.send(mqMessage,
@@ -210,7 +211,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
         try {
             MessageQueue messageQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
             long start = System.currentTimeMillis();
-            String lmqTopic = LMQ_PREFIX + queue.getQueueName();
+            String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
             pull(lmqTopic, messageQueue, queueOffset.getOffset(), (int) count, new PullCallback() {
                 @Override
                 public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
@@ -409,7 +410,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
     }
 
     private long maxOffset(Queue queue) throws MQClientException {
-        String lmqTopic = LMQ_PREFIX + queue.getQueueName();
+        String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
         MQClientInstance mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
         String brokerAddr = mQClientFactory.findBrokerAddressInPublish(queue.getBrokerName());
         if (null == brokerAddr) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
index 01bc106..b43ce6e 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestLmqQueueStoreManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
@@ -50,7 +51,6 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.apache.rocketmq.mqtt.common.facade.LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
 
@@ -92,7 +92,7 @@ public class TestLmqQueueStoreManager {
         lmqQueueStoreManager.putMessage(queues, message);
         ArgumentCaptor<org.apache.rocketmq.common.message.Message> argumentCaptor = ArgumentCaptor.forClass(org.apache.rocketmq.common.message.Message.class);
         verify(defaultMQProducer).send(argumentCaptor.capture(), any(SendCallback.class));
-        Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(PROPERTY_INNER_MULTI_DISPATCH));
+        Assert.assertTrue(null != argumentCaptor.getValue().getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH));
     }
 
     @Test
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
index 9c80609..a35d884 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/RocketMQProducer.java
@@ -22,8 +22,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -66,10 +67,10 @@ public class RocketMQProducer {
     }
 
     private static void setLmq(Message msg, Set<String> queues) {
-        msg.putUserProperty(LmqQueueStore.PROPERTY_INNER_MULTI_DISPATCH,
+        msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                 StringUtils.join(
-                        queues.stream().map(s -> LmqQueueStore.LMQ_PREFIX + s).collect(Collectors.toSet()),
-                        LmqQueueStore.MULTI_DISPATCH_QUEUE_SPLITTER));
+                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+                        MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
     }
 
     private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {