You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/03/29 04:31:53 UTC

[rocketmq-mqtt] 25/43: [ISSUE #17] check upstreamHookResult before enter message handler main logic

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit 22b850d003dc8691430786ca260403512260025c
Author: fangchengjin <fa...@xiaomi.com>
AuthorDate: Fri Mar 11 10:16:07 2022 +0800

    [ISSUE #17] check upstreamHookResult before enter message handler main logic
---
 .../cs/protocol/mqtt/handler/MqttConnectHandler.java    | 17 ++++++++++-------
 .../cs/protocol/mqtt/handler/MqttPublishHandler.java    | 11 ++++++-----
 .../cs/protocol/mqtt/handler/MqttSubscribeHandler.java  | 13 ++++++-------
 3 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index 7933b03..ecf3390 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -68,13 +68,7 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
         ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
         ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
         ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
-        scheduler.schedule(() -> {
-            if (!future.isDone()) {
-                future.complete(null);
-            }
-        }, 1, TimeUnit.SECONDS);
+
         String remark = upstreamHookResult.getRemark();
         if (!upstreamHookResult.isSuccess()) {
             byte connAckCode = (byte) upstreamHookResult.getSubCode();
@@ -87,6 +81,15 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
             channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
             return;
         }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
+        scheduler.schedule(() -> {
+            if (!future.isDone()) {
+                future.complete(null);
+            }
+        }, 1, TimeUnit.SECONDS);
+
         try {
             MqttConnAckMessage mqttConnAckMessage = getMqttConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
             future.thenAccept(aVoid -> {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
index 8c7bae0..a8d17d2 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
@@ -65,6 +65,12 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
         final MqttPublishVariableHeader variableHeader = mqttMessage.variableHeader();
         Channel channel = ctx.channel();
         String channelId = ChannelInfo.getId(channel);
+
+        if (!upstreamHookResult.isSuccess()) {
+            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, upstreamHookResult.getRemark());
+            return;
+        }
+
         final boolean isQos2Message = isQos2Message(mqttMessage);
         if (isQos2Message) {
             if (inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId())) {
@@ -72,11 +78,6 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
                 return;
             }
         }
-        String remark = upstreamHookResult.getRemark();
-        if (!upstreamHookResult.isSuccess()) {
-            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
-            return;
-        }
         doResponse(ctx, mqttMessage);
         if (isQos2Message) {
             inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId());
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
index 3f3be34..59e167f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
@@ -69,13 +69,17 @@ public class MqttSubscribeHandler implements MqttPacketHandler<MqttSubscribeMess
     @Resource
     private ConnectConf connectConf;
 
-    private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
-
+    private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_subscribe_future"));
 
     @Override
     public void doHandler(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMessage, HookResult upstreamHookResult) {
         String clientId = ChannelInfo.getClientId(ctx.channel());
         Channel channel = ctx.channel();
+        if (!upstreamHookResult.isSuccess()) {
+            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, upstreamHookResult.getRemark());
+            return;
+        }
+
         CompletableFuture<Void> future = new CompletableFuture<>();
         ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE, future);
         scheduler.schedule(() -> {
@@ -83,11 +87,6 @@ public class MqttSubscribeHandler implements MqttPacketHandler<MqttSubscribeMess
                 future.complete(null);
             }
         },1,TimeUnit.SECONDS);
-        String remark = upstreamHookResult.getRemark();
-        if (!upstreamHookResult.isSuccess()) {
-            channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
-            return;
-        }
         try {
             MqttSubscribePayload payload = mqttMessage.payload();
             List<MqttTopicSubscription> mqttTopicSubscriptions = payload.topicSubscriptions();