You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/15 07:29:17 UTC
[rocketmq-mqtt] branch main updated: [ISSUE #31] Refactor some code and fix some bugs in high concurrency scenarios (#32)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new f795be8 [ISSUE #31] Refactor some code and fix some bugs in high concurrency scenarios (#32)
f795be8 is described below
commit f795be8a77c0178fcc5241e0b004a6f74285d361
Author: tianliuliu <64...@qq.com>
AuthorDate: Tue Mar 15 15:29:12 2022 +0800
[ISSUE #31] Refactor some code and fix some bugs in high concurrency scenarios (#32)
---
.../java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java | 4 ++--
.../java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java | 6 +++++-
.../org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java | 2 +-
.../rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java | 1 -
4 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
index 72b0f57..03e2a35 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/MqttMsgId.java
@@ -60,7 +60,7 @@ public class MqttMsgId {
synchronized (msgIdEntry) {
int startingMessageId = msgIdEntry.nextMsgId;
int loopCount = 0;
- int maxLoopCount = 2;
+ int maxLoopCount = 1;
do {
msgIdEntry.nextMsgId++;
if (msgIdEntry.nextMsgId > MAX_MSG_ID) {
@@ -69,7 +69,7 @@ public class MqttMsgId {
if (msgIdEntry.nextMsgId == startingMessageId) {
loopCount++;
if (loopCount >= maxLoopCount) {
- msgIdEntry.nextMsgId++;
+ msgIdEntry.inUseMsgIds.clear();
break;
}
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 4eb9349..f1308ff 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -136,6 +136,10 @@ public class PushAction {
}
public void rollNextByAck(Session session, int mqttId) {
+ if (session == null) {
+ return;
+ }
+ mqttMsgId.releaseId(mqttId, session.getClientId());
InFlyCache.PendingDown pendingDown = inFlyCache.getPendingDownCache().get(session.getChannelId(), mqttId);
if (pendingDown == null) {
return;
@@ -144,7 +148,7 @@ public class PushAction {
}
public void rollNext(Session session, int mqttId) {
- if (session == null || session.isDestroyed()) {
+ if (session == null) {
return;
}
mqttMsgId.releaseId(mqttId, session.getClientId());
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 88b42aa..f521c2b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -180,7 +180,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
mqMessage.setTags(Constants.MQTT_TAG);
mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
- queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
+ queues.stream().map(s -> MixAll.LMQ_PREFIX + StringUtils.replace(s, "/", "%")).collect(Collectors.toSet()),
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
try {
long start = System.currentTimeMillis();
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 6d6cf03..b01ad56 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -66,7 +66,6 @@ public class PublishProcessor implements UpstreamProcessor {
Message message = MessageUtil.toMessage(mqttPublishMessage);
message.setMsgId(msgId);
message.setBornTimestamp(System.currentTimeMillis());
- message.setFirstTopic(mqttTopic.getFirstTopic());
CompletableFuture<StoreResult> storeResult = lmqQueueStore.putMessage(queueNames, message);
return storeResult.thenCompose(storeResult1 -> HookResult.newHookResult(HookResult.SUCCESS, null,
JSONObject.toJSONString(storeResult1).getBytes(StandardCharsets.UTF_8)));