You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:32:06 UTC

[rocketmq-mqtt] 38/43: [ISSUE #33] some thread safety problems

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit 295553678c1cd649675652989f9af8261b02c47b
Author: tianliuliu <64...@qq.com>
AuthorDate: Wed Mar 16 17:57:26 2022 +0800

    [ISSUE #33] some thread safety problems
    
    [ISSUE #33] some thread safety problems
---
 .../java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java  | 5 ++++-
 .../main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java  | 5 ++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
index 86ac2a8..9b32ce9 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
@@ -230,7 +230,10 @@ public class RetryDriver {
         Map<Integer, RetryMessage> noWaitRetryMsgMap = sessionNoWaitRetryMsgMap.get(channelId);
         if (noWaitRetryMsgMap == null) {
             noWaitRetryMsgMap = new ConcurrentHashMap<>(2);
-            sessionNoWaitRetryMsgMap.putIfAbsent(channelId, noWaitRetryMsgMap);
+            Map<Integer, RetryMessage> old = sessionNoWaitRetryMsgMap.putIfAbsent(channelId, noWaitRetryMsgMap);
+            if (old != null) {
+                noWaitRetryMsgMap = old;
+            }
         }
 
         if (!subscription.isRetry() &&
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 2ad887f..01c25af 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -223,7 +223,10 @@ public class NotifyManager {
                 AtomicInteger nodeFailCount = nodeFail.get(node);
                 if (nodeFailCount == null) {
                     nodeFailCount = new AtomicInteger();
-                    nodeFail.putIfAbsent(node, nodeFailCount);
+                    AtomicInteger old = nodeFail.putIfAbsent(node, nodeFailCount);
+                    if (old != null) {
+                        nodeFailCount = old;
+                    }
                 }
                 if (nodeFailCount.get() > NODE_FAIL_MAX_NUM) {
                     sendEventRetryMsg(messageEvents, 1, node, 0);