You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/03/08 10:22:35 UTC
[rocketmq-mqtt] branch main updated: [ISSUE #6] Some code can be Optimize (#7)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 87c1ca7 [ISSUE #6] Some code can be Optimize (#7)
87c1ca7 is described below
commit 87c1ca7c9441e504c7c801b582c24cb805a04125
Author: tianliuliu <64...@qq.com>
AuthorDate: Tue Mar 8 18:06:47 2022 +0800
[ISSUE #6] Some code can be Optimize (#7)
* for optimize same code
* for optimize same code and add unit test
* for optimize same code and add unit test 1
---
.../apache/rocketmq/mqtt/ds/meta/WildcardManager.java | 13 +++++--------
.../rocketmq/mqtt/ds/test/TestWildcardManager.java | 19 +++++++++++++++++++
2 files changed, 24 insertions(+), 8 deletions(-)
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index 545ea1f..7c137c8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -88,20 +88,17 @@ public class WildcardManager {
}
public Set<String> matchQueueSetByMsgTopic(String pubTopic, String namespace) {
+ Set<String> queueNames = new HashSet<>();
if (StringUtils.isBlank(pubTopic)) {
- return new HashSet<>();
+ return queueNames;
}
- String queueName = pubTopic;
MqttTopic mqttTopic = TopicUtils.decode(pubTopic);
String secondTopic = TopicUtils.normalizeSecondTopic(mqttTopic.getSecondTopic());
if (TopicUtils.isP2P(secondTopic)) {
String p2Peer = TopicUtils.getP2Peer(mqttTopic, namespace);
- queueName = TopicUtils.getP2pTopic(p2Peer);
- }
- Set<String> queueNames = new HashSet<>();
- queueNames.add(queueName);
-
- if (!TopicUtils.isP2P(secondTopic)) {
+ queueNames.add(TopicUtils.getP2pTopic(p2Peer));
+ } else {
+ queueNames.add(pubTopic);
Set<String> wildcards = matchWildcards(pubTopic);
if (wildcards != null && !wildcards.isEmpty()) {
for (String wildcard : wildcards) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
index a13cb88..7955c4b 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
@@ -53,6 +53,25 @@ public class TestWildcardManager {
Set<String> set = wildcardManager.matchQueueSetByMsgTopic(TopicUtils.normalizeTopic("test/a"),"");
Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("test/+")));
+ Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("test/a")));
+ Assert.assertTrue(set.size() == 2);
+ }
+
+ @Test
+ public void testForP2P() throws IllegalAccessException, InterruptedException {
+ WildcardManager wildcardManager = new WildcardManager();
+ MetaPersistManager metaPersistManager = mock(MetaPersistManager.class);
+ FieldUtils.writeDeclaredField(wildcardManager, "metaPersistManager", metaPersistManager, true);
+
+ when(metaPersistManager.getAllFirstTopics()).thenReturn(new HashSet<>(Arrays.asList("test")));
+ when(metaPersistManager.getWildcards(any())).thenReturn(new HashSet<>(Arrays.asList(TopicUtils.normalizeTopic("test/+"))));
+
+ wildcardManager.init();
+ Thread.sleep(1000L);
+
+ Set<String> set = wildcardManager.matchQueueSetByMsgTopic(TopicUtils.normalizeTopic("test/p2p/GID_sdasa@@@2222"),"");
+ Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("/p2p/GID_sdasa@@@2222/")));
+ Assert.assertTrue(set.size() == 1);
}
}