You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/15 07:47:57 UTC
[rocketmq-mqtt] branch main updated: [ISSUE #19] fix '#' check in topicFilter (#25)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new da74925 [ISSUE #19] fix '#' check in topicFilter (#25)
da74925 is described below
commit da74925fcc733d3f18cd7cdd41d390256bd462bd
Author: wangfan <42...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:47:49 2022 +0800
[ISSUE #19] fix '#' check in topicFilter (#25)
* fix '#' check in topicFilter
link #19
* remove email & author info
* Add licence header to file.
Co-authored-by: wangfan <wa...@xiaomi.com>
Co-authored-by: dinglei <li...@163.com>
---
.../rocketmq/mqtt/common/model/Constants.java | 4 +-
.../rocketmq/mqtt/common/model/Subscription.java | 2 +-
.../apache/rocketmq/mqtt/common/model/Trie.java | 18 +++----
.../rocketmq/mqtt/common/util/TopicUtils.java | 19 +++----
.../rocketmq/mqtt/common/test/TestTopicUtils.java | 60 ++++++++++++++++++++++
.../rocketmq/mqtt/cs/session/infly/InFlyCache.java | 8 +--
.../mqtt/cs/session/match/MatchAction.java | 4 +-
.../rocketmq/mqtt/ds/meta/WildcardManager.java | 4 +-
.../org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java | 4 +-
.../org/apache/rocketmq/mqtt/ds/mq/MqFactory.java | 8 +--
.../org/apache/rocketmq/mqtt/ds/mq/MqProducer.java | 6 +--
.../apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java | 4 +-
12 files changed, 94 insertions(+), 47 deletions(-)
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index c7e638b..f55afbb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -21,8 +21,8 @@ public class Constants {
public static final String NAMESPACE_SPLITER = "%";
public static final String MQTT_TOPIC_DELIMITER = "/";
- public static final String ADDFLAG = "+";
- public static final String JINFLAG = "#";
+ public static final String PLUS_SIGN = "+";
+ public static final String NUMBER_SIGN = "#";
public static final String P2P = "/p2p/";
public static final String RETRY = "/retry/";
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index 94b6a6c..18fb317 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -38,7 +38,7 @@ public class Subscription {
public boolean isWildCard() {
return topicFilter != null &&
- (topicFilter.contains(Constants.JINFLAG) || topicFilter.contains(Constants.ADDFLAG));
+ (topicFilter.contains(Constants.NUMBER_SIGN) || topicFilter.contains(Constants.PLUS_SIGN));
}
public String toFirstTopic() {
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
index 7333a91..675d04b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Trie.java
@@ -148,27 +148,27 @@ public class Trie<K, V> {
builder.delete(start, builder.length());
}
//match the #
- TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+ TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
if (jinMatch != null) {
int start = builder.length();
- builder.append(Constants.JINFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+ builder.append(Constants.NUMBER_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
result.addAll(findValuePath(jinMatch, topicArray, level + 1, maxLevel, builder, true));
builder.delete(start, builder.length());
}
//match the +
- TrieNode jiaMatch = currentNode.children.get(Constants.ADDFLAG);
+ TrieNode jiaMatch = currentNode.children.get(Constants.PLUS_SIGN);
if (jiaMatch != null) {
int start = builder.length();
- builder.append(Constants.ADDFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+ builder.append(Constants.PLUS_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
result.addAll(findValuePath(jiaMatch, topicArray, level + 1, maxLevel, builder, false));
builder.delete(start, builder.length());
}
} else {
//match the #
- TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+ TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
if (jinMatch != null) {
int start = builder.length();
- builder.append(Constants.JINFLAG).append(Constants.MQTT_TOPIC_DELIMITER);
+ builder.append(Constants.NUMBER_SIGN).append(Constants.MQTT_TOPIC_DELIMITER);
result.addAll(findValuePath(jinMatch, topicArray, level + 1, maxLevel, builder, true));
builder.delete(start, builder.length());
}
@@ -207,19 +207,19 @@ public class Trie<K, V> {
result.putAll(findValueSet(trieNode, topicArray, level + 1, maxLevel, false));
}
//match the #
- TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+ TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
if (jinMatch != null) {
result.putAll(findValueSet(jinMatch, topicArray, level + 1, maxLevel, true));
}
//match the +
- TrieNode jiaMatch = currentNode.children.get(Constants.ADDFLAG);
+ TrieNode jiaMatch = currentNode.children.get(Constants.PLUS_SIGN);
if (jiaMatch != null) {
result.putAll(findValueSet(jiaMatch, topicArray, level + 1, maxLevel, false));
}
return result;
} else {
//match the #
- TrieNode jinMatch = currentNode.children.get(Constants.JINFLAG);
+ TrieNode jinMatch = currentNode.children.get(Constants.NUMBER_SIGN);
if (jinMatch != null) {
result.putAll(findValueSet(jinMatch, topicArray, level + 1, maxLevel, true));
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 8a618b5..91cb875 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -130,7 +130,7 @@ public class TopicUtils {
public static boolean isWildCard(String topicFilter) {
return topicFilter != null &&
- (topicFilter.contains(Constants.JINFLAG) || topicFilter.contains(Constants.ADDFLAG));
+ (topicFilter.contains(Constants.NUMBER_SIGN) || topicFilter.contains(Constants.PLUS_SIGN));
}
public static boolean isMatch(String topic, String topicFilter) {
@@ -150,20 +150,21 @@ public class TopicUtils {
for (int i = 0; i < minTopicLength; i++) {
String sourceTopic = subscribeTopics[i];
- if (!Constants.JINFLAG.equals(sourceTopic) &&
- !Constants.ADDFLAG.equals(sourceTopic)) {
+ if (!isWildCard(sourceTopic)) {
if (!sourceTopic.equals(messageTopics[i])) {
return false;
}
}
// multi level
- if (Constants.JINFLAG.equals(sourceTopic)) {
- return true;
+ // [MQTT-4.7.1-2] In either case '#' MUST be the last character specified in the Topic Filter
+ // and "t/t1#" is invalid
+ if (Constants.NUMBER_SIGN.equals(sourceTopic)) {
+ return i == sourceTopicLength - 1;
}
boolean last = i == minTopicLength - 1 &&
(sourceTopicLength == targetTopicLength ||
(sourceTopicLength == targetTopicLength + 1 &&
- Constants.JINFLAG.equals(subscribeTopics[sourceTopicLength - 1])
+ Constants.NUMBER_SIGN.equals(subscribeTopics[sourceTopicLength - 1])
)
);
if (last) {
@@ -184,10 +185,4 @@ public class TopicUtils {
public static String wrapP2pLmq(String clientId) {
return normalizeTopic(Constants.P2P + clientId);
}
-
- public static void main(String[] args) {
- String topic = "/t/t1/t2";
- String topicFilter = "/t/t1/t2";
- System.out.println(TopicUtils.isMatch(topic, topicFilter));
- }
}
diff --git a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java
new file mode 100644
index 0000000..c30906f
--- /dev/null
+++ b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/TestTopicUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.common.test;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.mqtt.common.model.Trie;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTopicUtils {
+
+ @Test
+ public void testTopicMatch() {
+ String topic = "t/t1/t2/";
+ String topicFilter = "t/t1/t2/";
+ Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/#/";
+ Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/t1/+/";
+ Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/+/#/";
+ Assert.assertTrue(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/#/t2/";
+ Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/#/+/";
+ Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/t1/t2/t3/";
+ Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/t1/#/t3/";
+ Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+ topicFilter = "t/t1#/";
+ Assert.assertFalse(TopicUtils.isMatch(topic, topicFilter));
+
+
+ }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
index 9c5ef3d..849e97e 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
@@ -61,9 +61,7 @@ public class InFlyCache {
public void put(CacheType cacheType, String channelId, int mqttMsgId) {
ConcurrentMap<String, Set<Integer>> cache = whichCache(cacheType);
- if (!cache.containsKey(channelId)) {
- cache.putIfAbsent(channelId, new HashSet<>());
- }
+ cache.putIfAbsent(channelId, new HashSet<>());
Set<Integer> idCache = cache.get(channelId);
if (idCache == null) {
return;
@@ -111,9 +109,7 @@ public class InFlyCache {
pendingDown.setSubscription(subscription);
pendingDown.setQueue(queue);
pendingDown.setSeqId(message.getOffset());
- if (!cache.containsKey(channelId)) {
- cache.putIfAbsent(channelId, new ConcurrentHashMap<>(16));
- }
+ cache.putIfAbsent(channelId, new ConcurrentHashMap<>(16));
cache.get(channelId).put(mqttMsgId, pendingDown);
return pendingDown;
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
index d26e94e..5b509d3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/match/MatchAction.java
@@ -120,9 +120,7 @@ public class MatchAction {
}
synchronized (topicCache) {
- if (!topicCache.containsKey(topicFilter)) {
- topicCache.putIfAbsent(topicFilter, new HashSet<>());
- }
+ topicCache.putIfAbsent(topicFilter, new HashSet<>());
topicCache.get(topicFilter).add(channelId);
}
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index 594e690..e40733e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -99,9 +99,7 @@ public class WildcardManager {
queueNames.add(pubTopic);
Set<String> wildcards = matchWildcards(pubTopic);
if (wildcards != null && !wildcards.isEmpty()) {
- for (String wildcard : wildcards) {
- queueNames.add(wildcard);
- }
+ queueNames.addAll(wildcards);
}
}
return queueNames;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
index 6f170e1..c576479 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqConsumer.java
@@ -40,11 +40,11 @@ public class MqConsumer {
defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf((String)properties.get("threadNum")));
defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf((String)properties.get("threadNum")));
}
- defaultMQPushConsumer.setInstanceName(this.buildIntanceName());
+ defaultMQPushConsumer.setInstanceName(this.buildInstanceName());
defaultMQPushConsumer.setVipChannelEnabled(false);
}
- public String buildIntanceName() {
+ public String buildInstanceName() {
return Integer.toString(UtilAll.getPid())
+ "#" + System.nanoTime();
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
index 64962db..16de0ba 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqFactory.java
@@ -60,7 +60,7 @@ public class MqFactory {
public static DefaultMQProducer buildDefaultMQProducer(String group, String nameSrv) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(nameSrv);
- defaultMQProducer.setInstanceName(buildIntanceName());
+ defaultMQProducer.setInstanceName(buildInstanceName());
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setProducerGroup(group);
return defaultMQProducer;
@@ -76,7 +76,7 @@ public class MqFactory {
defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf((String) properties.get("threadNum")));
defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf((String) properties.get("threadNum")));
}
- defaultMQPushConsumer.setInstanceName(buildIntanceName());
+ defaultMQPushConsumer.setInstanceName(buildInstanceName());
defaultMQPushConsumer.setVipChannelEnabled(false);
defaultMQPushConsumer.setConsumerGroup(group);
if (messageListener instanceof MessageListenerOrderly) {
@@ -90,7 +90,7 @@ public class MqFactory {
public static DefaultMQPullConsumer buildDefaultMQPullConsumer(String group, String nameSrv) {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
defaultMQPullConsumer.setNamesrvAddr(nameSrv);
- defaultMQPullConsumer.setInstanceName(buildIntanceName());
+ defaultMQPullConsumer.setInstanceName(buildInstanceName());
defaultMQPullConsumer.setVipChannelEnabled(false);
defaultMQPullConsumer.setConsumerGroup(group);
return defaultMQPullConsumer;
@@ -104,7 +104,7 @@ public class MqFactory {
return defaultMQAdminExt;
}
- public static String buildIntanceName() {
+ public static String buildInstanceName() {
return Integer.toString(UtilAll.getPid())
+ "#" + System.nanoTime();
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
index 0c3bc76..d4b479b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqProducer.java
@@ -27,16 +27,16 @@ import java.util.Properties;
public class MqProducer {
- private DefaultMQProducer defaultMQProducer;
+ private final DefaultMQProducer defaultMQProducer;
public MqProducer(Properties properties) {
defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
- defaultMQProducer.setInstanceName(buildIntanceName());
+ defaultMQProducer.setInstanceName(buildInstanceName());
defaultMQProducer.setVipChannelEnabled(false);
}
- public String buildIntanceName() {
+ public String buildInstanceName() {
return Integer.toString(UtilAll.getPid())
+ "#" + System.nanoTime();
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
index 8748cc7..f39b627 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/mq/MqPullConsumer.java
@@ -30,11 +30,11 @@ public class MqPullConsumer {
public MqPullConsumer(Properties properties) {
defaultMQPullConsumer = new DefaultMQPullConsumer();
defaultMQPullConsumer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
- defaultMQPullConsumer.setInstanceName(this.buildIntanceName());
+ defaultMQPullConsumer.setInstanceName(this.buildInstanceName());
defaultMQPullConsumer.setVipChannelEnabled(false);
}
- public String buildIntanceName() {
+ public String buildInstanceName() {
return Integer.toString(UtilAll.getPid())
+ "#" + System.nanoTime();
}