You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/03/08 10:22:35 UTC

[rocketmq-mqtt] branch main updated: [ISSUE #6] Some code can be Optimize (#7)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 87c1ca7  [ISSUE #6] Some code can be Optimize (#7)
87c1ca7 is described below

commit 87c1ca7c9441e504c7c801b582c24cb805a04125
Author: tianliuliu <64...@qq.com>
AuthorDate: Tue Mar 8 18:06:47 2022 +0800

    [ISSUE #6] Some code can be Optimize (#7)
    
    * for optimize same code
    
    * for optimize same code and add unit test
    
    * for optimize same code and add unit test 1
---
 .../apache/rocketmq/mqtt/ds/meta/WildcardManager.java | 13 +++++--------
 .../rocketmq/mqtt/ds/test/TestWildcardManager.java    | 19 +++++++++++++++++++
 2 files changed, 24 insertions(+), 8 deletions(-)

diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
index 545ea1f..7c137c8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WildcardManager.java
@@ -88,20 +88,17 @@ public class WildcardManager {
     }
 
     public Set<String> matchQueueSetByMsgTopic(String pubTopic, String namespace) {
+        Set<String> queueNames = new HashSet<>();
         if (StringUtils.isBlank(pubTopic)) {
-            return new HashSet<>();
+            return queueNames;
         }
-        String queueName = pubTopic;
         MqttTopic mqttTopic = TopicUtils.decode(pubTopic);
         String secondTopic = TopicUtils.normalizeSecondTopic(mqttTopic.getSecondTopic());
         if (TopicUtils.isP2P(secondTopic)) {
             String p2Peer = TopicUtils.getP2Peer(mqttTopic, namespace);
-            queueName = TopicUtils.getP2pTopic(p2Peer);
-        }
-        Set<String> queueNames = new HashSet<>();
-        queueNames.add(queueName);
-
-        if (!TopicUtils.isP2P(secondTopic)) {
+            queueNames.add(TopicUtils.getP2pTopic(p2Peer));
+        } else {
+            queueNames.add(pubTopic);
             Set<String> wildcards = matchWildcards(pubTopic);
             if (wildcards != null && !wildcards.isEmpty()) {
                 for (String wildcard : wildcards) {
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
index a13cb88..7955c4b 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestWildcardManager.java
@@ -53,6 +53,25 @@ public class TestWildcardManager {
 
         Set<String> set =  wildcardManager.matchQueueSetByMsgTopic(TopicUtils.normalizeTopic("test/a"),"");
         Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("test/+")));
+        Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("test/a")));
+        Assert.assertTrue(set.size() == 2);
+    }
+
+    @Test
+    public void testForP2P() throws IllegalAccessException, InterruptedException {
+        WildcardManager wildcardManager = new WildcardManager();
+        MetaPersistManager metaPersistManager = mock(MetaPersistManager.class);
+        FieldUtils.writeDeclaredField(wildcardManager, "metaPersistManager", metaPersistManager, true);
+
+        when(metaPersistManager.getAllFirstTopics()).thenReturn(new HashSet<>(Arrays.asList("test")));
+        when(metaPersistManager.getWildcards(any())).thenReturn(new HashSet<>(Arrays.asList(TopicUtils.normalizeTopic("test/+"))));
+
+        wildcardManager.init();
+        Thread.sleep(1000L);
+
+        Set<String> set =  wildcardManager.matchQueueSetByMsgTopic(TopicUtils.normalizeTopic("test/p2p/GID_sdasa@@@2222"),"");
+        Assert.assertTrue(set.contains(TopicUtils.normalizeTopic("/p2p/GID_sdasa@@@2222/")));
+        Assert.assertTrue(set.size() == 1);
     }
 
 }