You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:31:53 UTC
[rocketmq-mqtt] 25/43: [ISSUE #17] check upstreamHookResult before enter message handler main logic
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
commit 22b850d003dc8691430786ca260403512260025c
Author: fangchengjin <fa...@xiaomi.com>
AuthorDate: Fri Mar 11 10:16:07 2022 +0800
[ISSUE #17] check upstreamHookResult before enter message handler main logic
---
.../cs/protocol/mqtt/handler/MqttConnectHandler.java | 17 ++++++++++-------
.../cs/protocol/mqtt/handler/MqttPublishHandler.java | 11 ++++++-----
.../cs/protocol/mqtt/handler/MqttSubscribeHandler.java | 13 ++++++-------
3 files changed, 22 insertions(+), 19 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index 7933b03..ecf3390 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -68,13 +68,7 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());
- CompletableFuture<Void> future = new CompletableFuture<>();
- ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
- scheduler.schedule(() -> {
- if (!future.isDone()) {
- future.complete(null);
- }
- }, 1, TimeUnit.SECONDS);
+
String remark = upstreamHookResult.getRemark();
if (!upstreamHookResult.isSuccess()) {
byte connAckCode = (byte) upstreamHookResult.getSubCode();
@@ -87,6 +81,15 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
return;
}
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
+ scheduler.schedule(() -> {
+ if (!future.isDone()) {
+ future.complete(null);
+ }
+ }, 1, TimeUnit.SECONDS);
+
try {
MqttConnAckMessage mqttConnAckMessage = getMqttConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
future.thenAccept(aVoid -> {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
index 8c7bae0..a8d17d2 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
@@ -65,6 +65,12 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
final MqttPublishVariableHeader variableHeader = mqttMessage.variableHeader();
Channel channel = ctx.channel();
String channelId = ChannelInfo.getId(channel);
+
+ if (!upstreamHookResult.isSuccess()) {
+ channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, upstreamHookResult.getRemark());
+ return;
+ }
+
final boolean isQos2Message = isQos2Message(mqttMessage);
if (isQos2Message) {
if (inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId())) {
@@ -72,11 +78,6 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
return;
}
}
- String remark = upstreamHookResult.getRemark();
- if (!upstreamHookResult.isSuccess()) {
- channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
- return;
- }
doResponse(ctx, mqttMessage);
if (isQos2Message) {
inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId());
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
index 3f3be34..59e167f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
@@ -69,13 +69,17 @@ public class MqttSubscribeHandler implements MqttPacketHandler<MqttSubscribeMess
@Resource
private ConnectConf connectConf;
- private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
-
+ private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_subscribe_future"));
@Override
public void doHandler(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMessage, HookResult upstreamHookResult) {
String clientId = ChannelInfo.getClientId(ctx.channel());
Channel channel = ctx.channel();
+ if (!upstreamHookResult.isSuccess()) {
+ channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, upstreamHookResult.getRemark());
+ return;
+ }
+
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE, future);
scheduler.schedule(() -> {
@@ -83,11 +87,6 @@ public class MqttSubscribeHandler implements MqttPacketHandler<MqttSubscribeMess
future.complete(null);
}
},1,TimeUnit.SECONDS);
- String remark = upstreamHookResult.getRemark();
- if (!upstreamHookResult.isSuccess()) {
- channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
- return;
- }
try {
MqttSubscribePayload payload = mqttMessage.payload();
List<MqttTopicSubscription> mqttTopicSubscriptions = payload.topicSubscriptions();