You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/16 09:57:33 UTC
[rocketmq-mqtt] branch main updated: [ISSUE #33] some thread safety problems
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 2955536 [ISSUE #33] some thread safety problems
2955536 is described below
commit 295553678c1cd649675652989f9af8261b02c47b
Author: tianliuliu <64...@qq.com>
AuthorDate: Wed Mar 16 17:57:26 2022 +0800
[ISSUE #33] some thread safety problems
[ISSUE #33] some thread safety problems
---
.../java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java | 5 ++++-
.../main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java | 5 ++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
index 86ac2a8..9b32ce9 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java
@@ -230,7 +230,10 @@ public class RetryDriver {
Map<Integer, RetryMessage> noWaitRetryMsgMap = sessionNoWaitRetryMsgMap.get(channelId);
if (noWaitRetryMsgMap == null) {
noWaitRetryMsgMap = new ConcurrentHashMap<>(2);
- sessionNoWaitRetryMsgMap.putIfAbsent(channelId, noWaitRetryMsgMap);
+ Map<Integer, RetryMessage> old = sessionNoWaitRetryMsgMap.putIfAbsent(channelId, noWaitRetryMsgMap);
+ if (old != null) {
+ noWaitRetryMsgMap = old;
+ }
}
if (!subscription.isRetry() &&
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index 2ad887f..01c25af 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -223,7 +223,10 @@ public class NotifyManager {
AtomicInteger nodeFailCount = nodeFail.get(node);
if (nodeFailCount == null) {
nodeFailCount = new AtomicInteger();
- nodeFail.putIfAbsent(node, nodeFailCount);
+ AtomicInteger old = nodeFail.putIfAbsent(node, nodeFailCount);
+ if (old != null) {
+ nodeFailCount = old;
+ }
}
if (nodeFailCount.get() > NODE_FAIL_MAX_NUM) {
sendEventRetryMsg(messageEvents, 1, node, 0);