You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2022/04/26 03:50:21 UTC

[rocketmq-mqtt] branch main updated: continuously improve codeCov for #22

This is an automated email from the ASF dual-hosted git repository.

tianliuliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e54f31  continuously improve codeCov for #22
     new 011fbb3  Merge pull request #82 from YxAc/continuously_improve_codeCov
6e54f31 is described below

commit 6e54f31700fa97165e84c92ad405546953522d91
Author: AhaThinking <ah...@AhaThinkingdeMacBook-Pro.local>
AuthorDate: Sat Apr 23 22:05:27 2022 +0800

    continuously improve codeCov for #22
---
 .../rocketmq/mqtt/cs/session/infly/InFlyCache.java |  6 +--
 .../rocketmq/mqtt/cs/session/infly/MqttMsgId.java  | 13 ++---
 .../rocketmq/mqtt/cs/test/TestInFlyCache.java      | 61 +++++++++++++++++++---
 .../rocketmq/mqtt/cs/test/TestMqttMsgId.java       | 14 ++++-
 4 files changed, 71 insertions(+), 23 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
index 849e97e..f947d82 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
@@ -41,7 +41,7 @@ public class InFlyCache {
     private ConcurrentMap<String, Set<Integer>> pubCache = new ConcurrentHashMap<>(128);
     private PendingDownCache pendingDownCache = new PendingDownCache();
 
-    public void cleanResource(String clientId,String channelId) {
+    public void cleanResource(String clientId, String channelId) {
         pubCache.remove(channelId);
         pendingDownCache.clear(clientId, channelId);
     }
@@ -67,7 +67,7 @@ public class InFlyCache {
             return;
         }
         synchronized (idCache) {
-            cache.get(channelId).add(mqttMsgId);
+            idCache.add(mqttMsgId);
         }
     }
 
@@ -125,7 +125,6 @@ public class InFlyCache {
             Map<Integer, PendingDown> map = cache.get(channelId);
             if (map != null) {
                 return map.remove(mqttMsgId);
-
             }
             return null;
         }
@@ -134,7 +133,6 @@ public class InFlyCache {
             Map<Integer, PendingDown> map = cache.get(channelId);
             if (map != null) {
                 return map.get(mqttMsgId);
-
             }
             return null;
         }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
index 03e2a35..a9bb53b 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
@@ -59,22 +59,17 @@ public class MqttMsgId {
         MsgIdEntry msgIdEntry = hashMsgID(clientId);
         synchronized (msgIdEntry) {
             int startingMessageId = msgIdEntry.nextMsgId;
-            int loopCount = 0;
-            int maxLoopCount = 1;
             do {
                 msgIdEntry.nextMsgId++;
                 if (msgIdEntry.nextMsgId > MAX_MSG_ID) {
                     msgIdEntry.nextMsgId = MIN_MSG_ID;
                 }
                 if (msgIdEntry.nextMsgId == startingMessageId) {
-                    loopCount++;
-                    if (loopCount >= maxLoopCount) {
-                        msgIdEntry.inUseMsgIds.clear();
-                        break;
-                    }
+                    msgIdEntry.inUseMsgIds.clear();
+                    break;
                 }
-            } while (msgIdEntry.inUseMsgIds.containsKey(new Integer(msgIdEntry.nextMsgId)));
-            Integer id = new Integer(msgIdEntry.nextMsgId);
+            } while (msgIdEntry.inUseMsgIds.containsKey(msgIdEntry.nextMsgId));
+            Integer id = msgIdEntry.nextMsgId;
             msgIdEntry.inUseMsgIds.put(id, id);
             return msgIdEntry.nextMsgId;
         }
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
index a3fb6e8..e341caa 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
@@ -19,31 +19,76 @@
 
 package org.apache.rocketmq.mqtt.cs.test;
 
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
 import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestInFlyCache {
+    private final String clientId = "testInFlyCache";
+    private final String channelId = "testInFlyCache";
+    private final int msgId = 1;
+
+    private InFlyCache inFlyCache;
+
+    @Mock
+    private Subscription subscription;
+
+    @Mock
+    private Queue queue;
+
+    @Mock
+    private Message message;
+
+    @Spy
+    private MqttMsgId mqttMsgId;
+
+    @Before
+    public void setUp() throws Exception {
+        inFlyCache = new InFlyCache();
+        mqttMsgId.init();
+        FieldUtils.writeDeclaredField(inFlyCache, "mqttMsgId", mqttMsgId, true);
+    }
 
     @Test
     public void test() {
-        InFlyCache inFlyCache = new InFlyCache();
-        inFlyCache.put(InFlyCache.CacheType.PUB, "test", 1);
-        Assert.assertTrue(inFlyCache.contains(InFlyCache.CacheType.PUB, "test", 1));
+        // cache put
+        inFlyCache.put(InFlyCache.CacheType.PUB, channelId, msgId);
+        Assert.assertTrue(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
+        InFlyCache.PendingDownCache pendingDownCache = inFlyCache.getPendingDownCache();
+        pendingDownCache.put(channelId, msgId, subscription, queue, message);
+        assertEquals(subscription, pendingDownCache.get(channelId, msgId).getSubscription());
+        assertEquals(queue, pendingDownCache.get(channelId, msgId).getQueue());
+        assertEquals(1, pendingDownCache.all(channelId).size());
 
-        inFlyCache.getPendingDownCache().put("test", 1, mock(Subscription.class), mock(Queue.class), mock(Message.class));
-        Assert.assertTrue(null != inFlyCache.getPendingDownCache().get("test", 1));
+        // cache remove
+        pendingDownCache.remove(channelId, msgId);
+        Assert.assertNull(pendingDownCache.get(channelId, msgId));
+        inFlyCache.remove(InFlyCache.CacheType.PUB, channelId, msgId);
+        Assert.assertFalse(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
 
-        inFlyCache.getPendingDownCache().remove("test", 1);
-        Assert.assertTrue(null == inFlyCache.getPendingDownCache().get("test", 1));
+        // test cleanResource
+        inFlyCache.put(InFlyCache.CacheType.PUB, channelId, msgId);
+        pendingDownCache.put(channelId, msgId, subscription, queue, message);
+        inFlyCache.cleanResource(clientId, channelId);
+        Assert.assertNull(pendingDownCache.get(channelId, msgId));
+        Assert.assertFalse(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
+    }
 
+    @Test (expected = RuntimeException.class)
+    public void testInvalidCacheType() {
+        inFlyCache.put(InFlyCache.CacheType.valueOf("SUB"), channelId, msgId);
     }
 }
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
index b4efbc4..97ed83f 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
@@ -32,7 +32,17 @@ public class TestMqttMsgId {
     public void test() {
         MqttMsgId mqttMsgId = new MqttMsgId();
         mqttMsgId.init();
-        int id = mqttMsgId.nextId("test");
-        Assert.assertFalse(mqttMsgId.nextId("test") == id);
+
+        String clientId = "testMsgId";
+        int loopCount = 0, maxMsgId = 65535;
+        while (loopCount < maxMsgId) {
+            Assert.assertEquals(loopCount + 1, mqttMsgId.nextId(clientId));
+            loopCount++;
+        }
+        // new round by triggering 'inUseMsgIds.clear' when 'startingMessageId' == 65535
+        Assert.assertEquals(loopCount, mqttMsgId.nextId(clientId));
+
+        mqttMsgId.releaseId(maxMsgId, "");
+        mqttMsgId.releaseId(maxMsgId, clientId);
     }
 }