You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:32:03 UTC

[rocketmq-mqtt] 35/43: [ISSUE #31] Refactor some code and fix some bugs in high concurrency scenarios (#32)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit f795be8a77c0178fcc5241e0b004a6f74285d361
Author: tianliuliu <64...@qq.com>
AuthorDate: Tue Mar 15 15:29:12 2022 +0800

    [ISSUE #31] Refactor some code and fix some bugs in high concurrency scenarios (#32)
---
 .../java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java   | 4 ++--
 .../java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java  | 6 +++++-
 .../org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java     | 2 +-
 .../rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java       | 1 -
 4 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
index 72b0f57..03e2a35 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
@@ -60,7 +60,7 @@ public class MqttMsgId {
         synchronized (msgIdEntry) {
             int startingMessageId = msgIdEntry.nextMsgId;
             int loopCount = 0;
-            int maxLoopCount = 2;
+            int maxLoopCount = 1;
             do {
                 msgIdEntry.nextMsgId++;
                 if (msgIdEntry.nextMsgId > MAX_MSG_ID) {
@@ -69,7 +69,7 @@ public class MqttMsgId {
                 if (msgIdEntry.nextMsgId == startingMessageId) {
                     loopCount++;
                     if (loopCount >= maxLoopCount) {
-                        msgIdEntry.nextMsgId++;
+                        msgIdEntry.inUseMsgIds.clear();
                         break;
                     }
                 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 4eb9349..f1308ff 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -136,6 +136,10 @@ public class PushAction {
     }
 
     public void rollNextByAck(Session session, int mqttId) {
+        if (session == null) {
+            return;
+        }
+        mqttMsgId.releaseId(mqttId, session.getClientId());
         InFlyCache.PendingDown pendingDown = inFlyCache.getPendingDownCache().get(session.getChannelId(), mqttId);
         if (pendingDown == null) {
             return;
@@ -144,7 +148,7 @@ public class PushAction {
     }
 
     public void rollNext(Session session, int mqttId) {
-        if (session == null || session.isDestroyed()) {
+        if (session == null) {
             return;
         }
         mqttMsgId.releaseId(mqttId, session.getClientId());
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 88b42aa..f521c2b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -180,7 +180,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
         mqMessage.setTags(Constants.MQTT_TAG);
         mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                 StringUtils.join(
-                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+                        queues.stream().map(s -> MixAll.LMQ_PREFIX + StringUtils.replace(s, "/", "%")).collect(Collectors.toSet()),
                         MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
         try {
             long start = System.currentTimeMillis();
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 6d6cf03..b01ad56 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -66,7 +66,6 @@ public class PublishProcessor implements UpstreamProcessor {
         Message message = MessageUtil.toMessage(mqttPublishMessage);
         message.setMsgId(msgId);
         message.setBornTimestamp(System.currentTimeMillis());
-        message.setFirstTopic(mqttTopic.getFirstTopic());
         CompletableFuture<StoreResult> storeResult = lmqQueueStore.putMessage(queueNames, message);
         return storeResult.thenCompose(storeResult1 -> HookResult.newHookResult(HookResult.SUCCESS, null,
                 JSONObject.toJSONString(storeResult1).getBytes(StandardCharsets.UTF_8)));