You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:32:04 UTC

[rocketmq-mqtt] 36/43: [ISSUE #19] fix '#' check in topicFilter (#25)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit da74925fcc733d3f18cd7cdd41d390256bd462bd
Author: wangfan <42...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:47:49 2022 +0800

    [ISSUE #19] fix '#' check in topicFilter (#25)
    
    * fix '#' check in topicFilter
    
    link #19
    
    * remove email & author info
    
    * Add licence header to file.
    
    Co-authored-by: wangfan <wa...@xiaomi.com>
    Co-authored-by: dinglei <li...@163.com>
---
 .../rocketmq/mqtt/common/model/Constants.java      |  4 +-
 .../rocketmq/mqtt/common/model/Subscription.java   |  2 +-
 .../apache/rocketmq/mqtt/common/model/Trie.java    | 18 +++----
 .../rocketmq/mqtt/common/util/TopicUtils.java      | 19 +++----
 .../rocketmq/mqtt/common/test/TestTopicUtils.java  | 60 ++++++++++++++++++++++
 .../rocketmq/mqtt/cs/session/infly/InFlyCache.java |  8 +--
 .../mqtt/cs/session/match/MatchAction.java         |  4 +-
 .../rocketmq/mqtt/ds/meta/WildcardManager.java     |  4 +-
 .../org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java |  4 +-
 .../org/apache/rocketmq/mqtt/ds/mq/MqFactory.java  |  8 +--
 .../org/apache/rocketmq/mqtt/ds/mq/MqProducer.java |  6 +--
 .../apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java |  4 +-
 12 files changed, 94 insertions(+), 47 deletions(-)

diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index c7e638b..f55afbb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -21,8 +21,8 @@ public class Constants {
     public static final String NAMESPACE_SPLITER = "%";
     public static final String MQTT_TOPIC_DELIMITER = "/";
 
-    public static final String ADDFLAG = "+";
-    public static final String JINFLAG = "#";
+    public static final String PLUS_SIGN = "+";
+    public static final String NUMBER_SIGN = "#";
 
     public static final String P2P = "/p2p/";
     public static final String RETRY = "/retry/";
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index 94b6a6c..18fb317 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -38,7 +38,7 @@ public class Subscription {
 
     public boolean isWildCard() {
         return topicFilter != null &&
-                (topicFilter.contains(Constants.JINFLAG) || topicFilter.contains(Constants.ADDFLAG));
+                (topicFilter.contains(Constants.NUMBER_SIGN) || topicFilter.contains(Constants.PLUS_SIGN));
     }
 
     public String toFirstTopic() {
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
index 7333a91..675d04b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
@@ -148,27 +148,27 @@ public class Trie<K, V> {
                 builder.delete(start, builder.length());
             }
             //match the #
-            TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+            TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
             if (jinMatch != null) {
                 int start = builder.length();
-                builder.append(Constants.JINFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+                builder.append(Constants.NUMBER_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
                 result.addAll(findValuePath(jinMatch, topicArray, level + 1, maxLevel, builder, true));
                 builder.delete(start, builder.length());
             }
             //match the +
-            TrieNode jiaMatch = currentNode.children.get(Constants.ADDFLAG);
+            TrieNode jiaMatch = currentNode.children.get(Constants.PLUS_SIGN);
             if (jiaMatch != null) {
                 int start = builder.length();
-                builder.append(Constants.ADDFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+                builder.append(Constants.PLUS_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
                 result.addAll(findValuePath(jiaMatch, topicArray, level + 1, maxLevel, builder, false));
                 builder.delete(start, builder.length());
             }
         } else {
             //match the #
-            TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+            TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
             if (jinMatch != null) {
                 int start = builder.length();
-                builder.append(Constants.JINFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+                builder.append(Constants.NUMBER_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
                 result.addAll(findValuePath(jinMatch, topicArray, level + 1, maxLevel, builder, true));
                 builder.delete(start, builder.length());
             }
@@ -207,19 +207,19 @@ public class Trie<K, V> {
                 result.putAll(findValueSet(trieNode, topicArray, level + 1, maxLevel, false));
             }
             //match the #
-            TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+            TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
             if (jinMatch != null) {
                 result.putAll(findValueSet(jinMatch, topicArray, level + 1, maxLevel, true));
             }
             //match the +
-            TrieNode jiaMatch = currentNode.children.get(Constants.ADDFLAG);
+            TrieNode jiaMatch = currentNode.children.get(Constants.PLUS_SIGN);
             if (jiaMatch != null) {
                 result.putAll(findValueSet(jiaMatch, topicArray, level + 1, maxLevel, false));
             }
             return result;
         } else {
             //match the #
-            TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+            TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
             if (jinMatch != null) {
                 result.putAll(findValueSet(jinMatch, topicArray, level + 1, maxLevel, true));
             }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 8a618b5..91cb875 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -130,7 +130,7 @@ public class TopicUtils {
 
     public static boolean isWildCard(String topicFilter) {
         return topicFilter != null &&
-                (topicFilter.contains(Constants.JINFLAG) || topicFilter.contains(Constants.ADDFLAG));
+                (topicFilter.contains(Constants.NUMBER_SIGN) || topicFilter.contains(Constants.PLUS_SIGN));
     }
 
     public static boolean isMatch(String topic, String topicFilter) {
@@ -150,20 +150,21 @@ public class TopicUtils {
         for (int i = 0; i < minTopicLength; i++) {
             String sourceTopic = subscribeTopics[i];
 
-            if (!Constants.JINFLAG.equals(sourceTopic) &&
-                    !Constants.ADDFLAG.equals(sourceTopic)) {
+            if (!isWildCard(sourceTopic)) {
                 if (!sourceTopic.equals(messageTopics[i])) {
                     return false;
                 }
             }
             // multi level
-            if (Constants.JINFLAG.equals(sourceTopic)) {
-                return true;
+            // [MQTT-4.7.1-2] In either case '#' MUST be the last character specified in the Topic Filter
+            // and "t/t1#" is invalid
+            if (Constants.NUMBER_SIGN.equals(sourceTopic)) {
+                return i == sourceTopicLength - 1;
             }
             boolean last = i == minTopicLength - 1 &&
                     (sourceTopicLength == targetTopicLength ||
                             (sourceTopicLength == targetTopicLength + 1 &&
-                                    Constants.JINFLAG.equals(subscribeTopics[sourceTopicLength - 1])
+                                    Constants.NUMBER_SIGN.equals(subscribeTopics[sourceTopicLength - 1])
                             )
                     );
             if (last) {
@@ -184,10 +185,4 @@ public class TopicUtils {
     public static String wrapP2pLmq(String clientId) {
         return normalizeTopic(Constants.P2P + clientId);
     }
-
-    public static void main(String[] args) {
-        String topic = "/t/t1/t2";
-        String topicFilter = "/t/t1/t2";
-        System.out.println(TopicUtils.isMatch(topic, topicFilter));
-    }
 }
diff --git a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java
new file mode 100644
index 0000000..c30906f
--- /dev/null
+++ b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.common.test;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.mqtt.common.model.Trie;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTopicUtils {
+
+  @Test
+  public void testTopicMatch() {
+    String topic = "t/t1/t2/";
+    String topicFilter = "t/t1/t2/";
+    Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/#/";
+    Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/t1/+/";
+    Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/+/#/";
+    Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/#/t2/";
+    Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/#/+/";
+    Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/t1/t2/t3/";
+    Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/t1/#/t3/";
+    Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+    topicFilter = "t/t1#/";
+    Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+
+  }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
index 9c5ef3d..849e97e 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
@@ -61,9 +61,7 @@ public class InFlyCache {
 
     public void put(CacheType cacheType, String channelId, int mqttMsgId) {
         ConcurrentMap<String, Set<Integer>> cache = whichCache(cacheType);
-        if (!cache.containsKey(channelId)) {
-            cache.putIfAbsent(channelId, new HashSet<>());
-        }
+        cache.putIfAbsent(channelId, new HashSet<>());
         Set<Integer> idCache = cache.get(channelId);
         if (idCache == null) {
             return;
@@ -111,9 +109,7 @@ public class InFlyCache {
             pendingDown.setSubscription(subscription);
             pendingDown.setQueue(queue);
             pendingDown.setSeqId(message.getOffset());
-            if (!cache.containsKey(channelId)) {
-                cache.putIfAbsent(channelId, new ConcurrentHashMap<>(16));
-            }
+            cache.putIfAbsent(channelId, new ConcurrentHashMap<>(16));
             cache.get(channelId).put(mqttMsgId, pendingDown);
             return pendingDown;
         }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
index d26e94e..5b509d3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
@@ -120,9 +120,7 @@ public class MatchAction {
             }
 
             synchronized (topicCache) {
-                if (!topicCache.containsKey(topicFilter)) {
-                    topicCache.putIfAbsent(topicFilter, new HashSet<>());
-                }
+                topicCache.putIfAbsent(topicFilter, new HashSet<>());
                 topicCache.get(topicFilter).add(channelId);
             }
         }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index 594e690..e40733e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -99,9 +99,7 @@ public class WildcardManager {
             queueNames.add(pubTopic);
             Set<String> wildcards = matchWildcards(pubTopic);
             if (wildcards != null && !wildcards.isEmpty()) {
-                for (String wildcard : wildcards) {
-                    queueNames.add(wildcard);
-                }
+                queueNames.addAll(wildcards);
             }
         }
         return queueNames;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
index 6f170e1..c576479 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
@@ -40,11 +40,11 @@ public class MqConsumer  {
             defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf((String)properties.get("threadNum")));
             defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf((String)properties.get("threadNum")));
         }
-        defaultMQPushConsumer.setInstanceName(this.buildIntanceName());
+        defaultMQPushConsumer.setInstanceName(this.buildInstanceName());
         defaultMQPushConsumer.setVipChannelEnabled(false);
     }
 
-    public String buildIntanceName() {
+    public String buildInstanceName() {
         return Integer.toString(UtilAll.getPid())
                 + "#" + System.nanoTime();
     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
index 64962db..16de0ba 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
@@ -60,7 +60,7 @@ public class MqFactory {
     public static DefaultMQProducer buildDefaultMQProducer(String group, String nameSrv) {
         DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr(nameSrv);
-        defaultMQProducer.setInstanceName(buildIntanceName());
+        defaultMQProducer.setInstanceName(buildInstanceName());
         defaultMQProducer.setVipChannelEnabled(false);
         defaultMQProducer.setProducerGroup(group);
         return defaultMQProducer;
@@ -76,7 +76,7 @@ public class MqFactory {
             defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf((String) properties.get("threadNum")));
             defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf((String) properties.get("threadNum")));
         }
-        defaultMQPushConsumer.setInstanceName(buildIntanceName());
+        defaultMQPushConsumer.setInstanceName(buildInstanceName());
         defaultMQPushConsumer.setVipChannelEnabled(false);
         defaultMQPushConsumer.setConsumerGroup(group);
         if (messageListener instanceof MessageListenerOrderly) {
@@ -90,7 +90,7 @@ public class MqFactory {
     public static DefaultMQPullConsumer buildDefaultMQPullConsumer(String group, String nameSrv) {
         DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
         defaultMQPullConsumer.setNamesrvAddr(nameSrv);
-        defaultMQPullConsumer.setInstanceName(buildIntanceName());
+        defaultMQPullConsumer.setInstanceName(buildInstanceName());
         defaultMQPullConsumer.setVipChannelEnabled(false);
         defaultMQPullConsumer.setConsumerGroup(group);
         return defaultMQPullConsumer;
@@ -104,7 +104,7 @@ public class MqFactory {
         return defaultMQAdminExt;
     }
 
-    public static String buildIntanceName() {
+    public static String buildInstanceName() {
         return Integer.toString(UtilAll.getPid())
                 + "#" + System.nanoTime();
     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
index 0c3bc76..d4b479b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
@@ -27,16 +27,16 @@ import java.util.Properties;
 
 public class MqProducer   {
 
-    private DefaultMQProducer defaultMQProducer;
+    private final DefaultMQProducer defaultMQProducer;
 
     public MqProducer(Properties properties) {
         defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
-        defaultMQProducer.setInstanceName(buildIntanceName());
+        defaultMQProducer.setInstanceName(buildInstanceName());
         defaultMQProducer.setVipChannelEnabled(false);
     }
 
-    public String buildIntanceName() {
+    public String buildInstanceName() {
         return Integer.toString(UtilAll.getPid())
                 + "#" + System.nanoTime();
     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
index 8748cc7..f39b627 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
@@ -30,11 +30,11 @@ public class MqPullConsumer {
     public MqPullConsumer(Properties properties) {
         defaultMQPullConsumer = new DefaultMQPullConsumer();
         defaultMQPullConsumer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
-        defaultMQPullConsumer.setInstanceName(this.buildIntanceName());
+        defaultMQPullConsumer.setInstanceName(this.buildInstanceName());
         defaultMQPullConsumer.setVipChannelEnabled(false);
     }
 
-    public String buildIntanceName() {
+    public String buildInstanceName() {
         return Integer.toString(UtilAll.getPid())
                 + "#" + System.nanoTime();
     }