You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2022/04/26 03:50:21 UTC
[rocketmq-mqtt] branch main updated: continuously improve codeCov for #22
This is an automated email from the ASF dual-hosted git repository.
tianliuliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 6e54f31 continuously improve codeCov for #22
new 011fbb3 Merge pull request #82 from YxAc/continuously_improve_codeCov
6e54f31 is described below
commit 6e54f31700fa97165e84c92ad405546953522d91
Author: AhaThinking <ah...@AhaThinkingdeMacBook-Pro.local>
AuthorDate: Sat Apr 23 22:05:27 2022 +0800
continuously improve codeCov for #22
---
.../rocketmq/mqtt/cs/session/infly/InFlyCache.java | 6 +--
.../rocketmq/mqtt/cs/session/infly/MqttMsgId.java | 13 ++---
.../rocketmq/mqtt/cs/test/TestInFlyCache.java | 61 +++++++++++++++++++---
.../rocketmq/mqtt/cs/test/TestMqttMsgId.java | 14 ++++-
4 files changed, 71 insertions(+), 23 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
index 849e97e..f947d82 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/InFlyCache.java
@@ -41,7 +41,7 @@ public class InFlyCache {
private ConcurrentMap<String, Set<Integer>> pubCache = new ConcurrentHashMap<>(128);
private PendingDownCache pendingDownCache = new PendingDownCache();
- public void cleanResource(String clientId,String channelId) {
+ public void cleanResource(String clientId, String channelId) {
pubCache.remove(channelId);
pendingDownCache.clear(clientId, channelId);
}
@@ -67,7 +67,7 @@ public class InFlyCache {
return;
}
synchronized (idCache) {
- cache.get(channelId).add(mqttMsgId);
+ idCache.add(mqttMsgId);
}
}
@@ -125,7 +125,6 @@ public class InFlyCache {
Map<Integer, PendingDown> map = cache.get(channelId);
if (map != null) {
return map.remove(mqttMsgId);
-
}
return null;
}
@@ -134,7 +133,6 @@ public class InFlyCache {
Map<Integer, PendingDown> map = cache.get(channelId);
if (map != null) {
return map.get(mqttMsgId);
-
}
return null;
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
index 03e2a35..a9bb53b 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
@@ -59,22 +59,17 @@ public class MqttMsgId {
MsgIdEntry msgIdEntry = hashMsgID(clientId);
synchronized (msgIdEntry) {
int startingMessageId = msgIdEntry.nextMsgId;
- int loopCount = 0;
- int maxLoopCount = 1;
do {
msgIdEntry.nextMsgId++;
if (msgIdEntry.nextMsgId > MAX_MSG_ID) {
msgIdEntry.nextMsgId = MIN_MSG_ID;
}
if (msgIdEntry.nextMsgId == startingMessageId) {
- loopCount++;
- if (loopCount >= maxLoopCount) {
- msgIdEntry.inUseMsgIds.clear();
- break;
- }
+ msgIdEntry.inUseMsgIds.clear();
+ break;
}
- } while (msgIdEntry.inUseMsgIds.containsKey(new Integer(msgIdEntry.nextMsgId)));
- Integer id = new Integer(msgIdEntry.nextMsgId);
+ } while (msgIdEntry.inUseMsgIds.containsKey(msgIdEntry.nextMsgId));
+ Integer id = msgIdEntry.nextMsgId;
msgIdEntry.inUseMsgIds.put(id, id);
return msgIdEntry.nextMsgId;
}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
index a3fb6e8..e341caa 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestInFlyCache.java
@@ -19,31 +19,76 @@
package org.apache.rocketmq.mqtt.cs.test;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
@RunWith(MockitoJUnitRunner.class)
public class TestInFlyCache {
+ private final String clientId = "testInFlyCache";
+ private final String channelId = "testInFlyCache";
+ private final int msgId = 1;
+
+ private InFlyCache inFlyCache;
+
+ @Mock
+ private Subscription subscription;
+
+ @Mock
+ private Queue queue;
+
+ @Mock
+ private Message message;
+
+ @Spy
+ private MqttMsgId mqttMsgId;
+
+ @Before
+ public void setUp() throws Exception {
+ inFlyCache = new InFlyCache();
+ mqttMsgId.init();
+ FieldUtils.writeDeclaredField(inFlyCache, "mqttMsgId", mqttMsgId, true);
+ }
@Test
public void test() {
- InFlyCache inFlyCache = new InFlyCache();
- inFlyCache.put(InFlyCache.CacheType.PUB, "test", 1);
- Assert.assertTrue(inFlyCache.contains(InFlyCache.CacheType.PUB, "test", 1));
+ // cache put
+ inFlyCache.put(InFlyCache.CacheType.PUB, channelId, msgId);
+ Assert.assertTrue(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
+ InFlyCache.PendingDownCache pendingDownCache = inFlyCache.getPendingDownCache();
+ pendingDownCache.put(channelId, msgId, subscription, queue, message);
+ assertEquals(subscription, pendingDownCache.get(channelId, msgId).getSubscription());
+ assertEquals(queue, pendingDownCache.get(channelId, msgId).getQueue());
+ assertEquals(1, pendingDownCache.all(channelId).size());
- inFlyCache.getPendingDownCache().put("test", 1, mock(Subscription.class), mock(Queue.class), mock(Message.class));
- Assert.assertTrue(null != inFlyCache.getPendingDownCache().get("test", 1));
+ // cache remove
+ pendingDownCache.remove(channelId, msgId);
+ Assert.assertNull(pendingDownCache.get(channelId, msgId));
+ inFlyCache.remove(InFlyCache.CacheType.PUB, channelId, msgId);
+ Assert.assertFalse(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
- inFlyCache.getPendingDownCache().remove("test", 1);
- Assert.assertTrue(null == inFlyCache.getPendingDownCache().get("test", 1));
+ // test cleanResource
+ inFlyCache.put(InFlyCache.CacheType.PUB, channelId, msgId);
+ pendingDownCache.put(channelId, msgId, subscription, queue, message);
+ inFlyCache.cleanResource(clientId, channelId);
+ Assert.assertNull(pendingDownCache.get(channelId, msgId));
+ Assert.assertFalse(inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, msgId));
+ }
+ @Test (expected = RuntimeException.class)
+ public void testInvalidCacheType() {
+ inFlyCache.put(InFlyCache.CacheType.valueOf("SUB"), channelId, msgId);
}
}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
index b4efbc4..97ed83f 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestMqttMsgId.java
@@ -32,7 +32,17 @@ public class TestMqttMsgId {
public void test() {
MqttMsgId mqttMsgId = new MqttMsgId();
mqttMsgId.init();
- int id = mqttMsgId.nextId("test");
- Assert.assertFalse(mqttMsgId.nextId("test") == id);
+
+ String clientId = "testMsgId";
+ int loopCount = 0, maxMsgId = 65535;
+ while (loopCount < maxMsgId) {
+ Assert.assertEquals(loopCount + 1, mqttMsgId.nextId(clientId));
+ loopCount++;
+ }
+ // new round by triggering 'inUseMsgIds.clear' when 'startingMessageId' == 65535
+ Assert.assertEquals(loopCount, mqttMsgId.nextId(clientId));
+
+ mqttMsgId.releaseId(maxMsgId, "");
+ mqttMsgId.releaseId(maxMsgId, clientId);
}
}