You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/11 02:21:05 UTC

[rocketmq-mqtt] branch main updated: code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 1393642  code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)
1393642 is described below

commit 1393642c2397a72077892771883be9d5750611e5
Author: YongXing <xy...@gmail.com>
AuthorDate: Mon Apr 11 10:21:00 2022 +0800

    code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)
    
    Co-authored-by: AhaThinking <ah...@SK-20211027RHLU.mioffice.cn>
---
 .../protocol/mqtt/handler/MqttConnectHandler.java  |  10 +-
 .../cs/protocol/mqtt/handler/MqttPingHandler.java  |   8 --
 .../protocol/mqtt/handler/MqttPubAckHandler.java   |   4 +-
 .../protocol/mqtt/handler/MqttPubCompHandler.java  |  12 +-
 .../protocol/mqtt/handler/MqttPubRecHandler.java   |  14 +-
 .../protocol/mqtt/handler/MqttPublishHandler.java  |  32 ++---
 .../mqtt/handler/TestMqttConnectHandler.java       | 135 ++++++++++++++++++
 .../mqtt/handler/TestMqttDisconnectHandler.java    |  62 +++++++++
 .../protocol/mqtt/handler/TestMqttPingHandler.java |  63 +++++++++
 .../mqtt/handler/TestMqttPubAckHandler.java        | 103 ++++++++++++++
 .../mqtt/handler/TestMqttPubCompHandler.java       |  92 ++++++++++++
 .../mqtt/handler/TestMqttPubRecHandler.java        | 102 ++++++++++++++
 .../mqtt/handler/TestMqttPubRelHandler.java        |  87 ++++++++++++
 .../mqtt/handler/TestMqttPublishHandler.java       | 154 +++++++++++++++++++++
 14 files changed, 816 insertions(+), 62 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index ecf3390..9a09a5d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.slf4j.Logger;
@@ -56,9 +55,6 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
     @Resource
     private SessionLoop sessionLoop;
 
-    @Resource
-    private ConnectConf connectConf;
-
     private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
 
     @Override
@@ -73,10 +69,6 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
         if (!upstreamHookResult.isSuccess()) {
             byte connAckCode = (byte) upstreamHookResult.getSubCode();
             MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
-            if (mqttConnectReturnCode == null) {
-                channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
-                return;
-            }
             channel.writeAndFlush(getMqttConnAckMessage(mqttConnectReturnCode));
             channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
             return;
@@ -84,6 +76,8 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
 
         CompletableFuture<Void> future = new CompletableFuture<>();
         ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
+
+        // use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
         scheduler.schedule(() -> {
             if (!future.isDone()) {
                 future.complete(null);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
index 7fa1fbd..6a5e42d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
 
-
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
@@ -26,23 +25,16 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
-import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
-
-
 
 @Component
 public class MqttPingHandler implements MqttPacketHandler<MqttMessage> {
     private static Logger logger = LoggerFactory.getLogger(MqttPingHandler.class);
 
-    @Resource
-    private ChannelManager channelManager;
-
     @Override
     public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
         MqttFixedHeader mqttFixedHeader =
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
index ce9f2ea..77e4062 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttPubAckMessage;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -51,6 +52,7 @@ public class MqttPubAckHandler implements MqttPacketHandler<MqttPubAckMessage> {
     public void doHandler(ChannelHandlerContext ctx, MqttPubAckMessage mqttMessage, HookResult upstreamHookResult) {
         int messageId = mqttMessage.variableHeader().messageId();
         retryDriver.unMountPublish(messageId, ChannelInfo.getId(ctx.channel()));
-        pushAction.rollNextByAck(sessionLoop.getSession(ChannelInfo.getId(ctx.channel())), messageId);
+        Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
+        pushAction.rollNextByAck(session, messageId);
     }
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
index 27a805c..606027a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
@@ -23,8 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
+import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -39,12 +38,6 @@ public class MqttPubCompHandler implements MqttPacketHandler<MqttMessage> {
     @Resource
     private RetryDriver retryDriver;
 
-    @Resource
-    private InFlyCache inFlyCache;
-
-    @Resource
-    private MqttMsgId mqttMsgId;
-
     @Resource
     private PushAction pushAction;
 
@@ -59,7 +52,8 @@ public class MqttPubCompHandler implements MqttPacketHandler<MqttMessage> {
         retryDriver.unMountPubRel(variableHeader.messageId(), ChannelInfo.getId(ctx.channel()));
 
         //The Packet Identifier becomes available for reuse once the Sender has received the PUBCOMP Packet.
-        pushAction.rollNextByAck(sessionLoop.getSession(channelId), variableHeader.messageId());
+        Session session = sessionLoop.getSession(channelId);
+        pushAction.rollNextByAck(session, variableHeader.messageId());
     }
 
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
index 258ebf3..49e1dcb 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
 
-
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
@@ -27,26 +26,19 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 
-
 @Component
 public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
+    private final MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
+            MqttQoS.AT_LEAST_ONCE, false, 0);
 
     @Resource
     private RetryDriver retryDriver;
 
-    @Resource
-    private InFlyCache inFlyCache;
-
-    @Resource
-    private SessionLoop sessionLoop;
-
     @Override
     public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
@@ -54,8 +46,6 @@ public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
         retryDriver.unMountPublish(variableHeader.messageId(), channelId);
         retryDriver.mountPubRel(variableHeader.messageId(), channelId);
 
-        MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
-                MqttQoS.AT_LEAST_ONCE, false, 0);
         MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader, variableHeader);
         ctx.channel().writeAndFlush(pubRelMqttMessage);
     }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
index a8d17d2..c5594bc 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
@@ -31,10 +31,8 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -51,13 +49,6 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
     @Resource
     private ChannelManager channelManager;
 
-    @Resource
-    private SessionLoop sessionLoop;
-
-    @Resource
-    private ConnectConf connectConf;
-
-
     @Override
     public void doHandler(ChannelHandlerContext ctx,
                           MqttPublishMessage mqttMessage,
@@ -71,16 +62,11 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
             return;
         }
 
-        final boolean isQos2Message = isQos2Message(mqttMessage);
-        if (isQos2Message) {
-            if (inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId())) {
-                doResponse(ctx, mqttMessage);
-                return;
-            }
-        }
         doResponse(ctx, mqttMessage);
-        if (isQos2Message) {
-            inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId());
+
+        final boolean isQos2Message = isQos2Message(mqttMessage);
+        if (isQos2Message && !inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.packetId())) {
+            inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.packetId());
         }
     }
 
@@ -96,19 +82,17 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
                 break;
             case AT_LEAST_ONCE:
                 MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false,
-                    MqttQoS.AT_MOST_ONCE,
-                    false, 0);
+                    MqttQoS.AT_MOST_ONCE, false, 0);
                 MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader
-                    .from(variableHeader.messageId());
+                    .from(variableHeader.packetId());
                 MqttPubAckMessage pubackMessage = new MqttPubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
                 ctx.channel().writeAndFlush(pubackMessage);
                 break;
             case EXACTLY_ONCE:
                 MqttFixedHeader pubrecMqttHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false,
-                    MqttQoS.AT_MOST_ONCE,
-                    false, 0);
+                    MqttQoS.AT_MOST_ONCE, false, 0);
                 MqttMessageIdVariableHeader pubrecMessageIdVariableHeader = MqttMessageIdVariableHeader
-                    .from(variableHeader.messageId());
+                    .from(variableHeader.packetId());
                 MqttMessage pubrecMqttMessage = new MqttMessage(pubrecMqttHeader, pubrecMessageIdVariableHeader);
                 ctx.channel().writeAndFlush(pubrecMqttMessage);
                 break;
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
new file mode 100644
index 0000000..4ff35ef
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectPayload;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttConnectHandler;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttConnectHandler {
+    private MqttConnectHandler connectHandler;
+    private MqttConnectMessage connectMessage;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private DefaultChannelManager channelManager;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Before
+    public void setUp() throws IllegalAccessException {
+        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(null, 0, false,
+            false, false, 0, false, true, 1);
+        MqttConnectPayload payload = new MqttConnectPayload("testConnHandler", null, (byte[]) null, null, null);
+        connectMessage = new MqttConnectMessage(mqttFixedHeader, variableHeader, payload);
+
+        connectHandler = new MqttConnectHandler();
+        FieldUtils.writeDeclaredField(connectHandler, "channelManager", channelManager, true);
+        FieldUtils.writeDeclaredField(connectHandler, "sessionLoop", sessionLoop, true);
+
+        when(ctx.channel()).thenReturn(channel);
+    }
+
+    @After
+    public void After() {}
+
+    @Test
+    public void testDoHandlerAuthFailed() {
+        HookResult authFailHook = new HookResult(HookResult.FAIL,
+            MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), Remark.AUTH_FAILED, null);
+        doReturn(null).when(channel).writeAndFlush(any());
+        doNothing().when(channelManager).closeConnect(channel, ChannelCloseFrom.SERVER, Remark.AUTH_FAILED);
+
+        connectHandler.doHandler(ctx, connectMessage, authFailHook);
+
+        verify(channel).writeAndFlush(any());
+        verify(channelManager).closeConnect(channel, ChannelCloseFrom.SERVER, Remark.AUTH_FAILED);
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+
+    @Test
+    public void testDoHandlerChannelInActive() {
+        HookResult hookResult = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+        doReturn(false).when(channel).isActive();
+        doNothing().when(sessionLoop).loadSession(any(), any());
+
+        connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+        // wait scheduler execution
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ignored) {}
+
+        verify(sessionLoop).loadSession(any(), any());
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+
+    @Test
+    public void testDoHandlerSuccess() {
+        HookResult hookResult = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+        doReturn(true).when(channel).isActive();
+        doNothing().when(sessionLoop).loadSession(any(), any());
+
+        connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+        // wait scheduler execution
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ignored) {}
+
+        verify(channel).writeAndFlush(any(MqttConnAckMessage.class));
+        verify(sessionLoop).loadSession(any(), any());
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
new file mode 100644
index 0000000..744dd53
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttDisconnectHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttDisconnectHandler {
+
+    private MqttDisconnectHandler disconnectHandler;
+
+    @Mock
+    private ChannelManager channelManager;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        disconnectHandler = new MqttDisconnectHandler();
+        FieldUtils.writeDeclaredField(disconnectHandler, "channelManager", channelManager, true);
+
+        disconnectHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(channelManager).closeConnect(any(), any(), any());
+        verifyNoMoreInteractions(channelManager);
+    }
+}
\ No newline at end of file
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
new file mode 100644
index 0000000..3b3037c
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPingHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPingHandler {
+
+    private MqttPingHandler pingHandler;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Test
+    public void testDoHandler() {
+        pingHandler = new MqttPingHandler();
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(channel).writeAndFlush(any());
+
+        pingHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(ctx).channel();
+        verify(channel).writeAndFlush(any());
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
new file mode 100644
index 0000000..646d835
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubAckHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubAckHandler {
+    private MqttPubAckHandler pubAckHandler;
+    private MqttMessageIdVariableHeader variableHeader;
+    private MqttPubAckMessage pubAckMessage;
+    private MqttFixedHeader fixedHeader;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Mock
+    private PushAction pushAction;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Mock
+    private Session session;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        variableHeader = MqttMessageIdVariableHeader.from(007);
+        fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);
+    }
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        pubAckHandler = new MqttPubAckHandler();
+        FieldUtils.writeDeclaredField(pubAckHandler, "pushAction", pushAction, true);
+        FieldUtils.writeDeclaredField(pubAckHandler, "retryDriver", retryDriver, true);
+        FieldUtils.writeDeclaredField(pubAckHandler, "sessionLoop", sessionLoop, true);
+
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(retryDriver).unMountPublish(anyInt(), anyString());
+        doReturn(session).when(sessionLoop).getSession(anyString());
+        doNothing().when(pushAction).rollNextByAck(any(), anyInt());
+
+        pubAckHandler.doHandler(ctx, pubAckMessage, hookResult);
+        verify(ctx, times(2)).channel();
+        verify(sessionLoop).getSession(anyString());
+        verify(pushAction).rollNextByAck(eq(session), anyInt());
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
new file mode 100644
index 0000000..a500709
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubCompHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubCompHandler {
+    private MqttPubCompHandler pubCompHandler;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private PushAction pushAction;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Mock
+    private Session session;
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        pubCompHandler = new MqttPubCompHandler();
+        FieldUtils.writeDeclaredField(pubCompHandler, "retryDriver", retryDriver, true);
+        FieldUtils.writeDeclaredField(pubCompHandler, "pushAction", pushAction, true);
+        FieldUtils.writeDeclaredField(pubCompHandler, "sessionLoop", sessionLoop, true);
+
+        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(666);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        when(ctx.channel()).thenReturn(channel);
+        when(sessionLoop.getSession(any())).thenReturn(session);
+
+        pubCompHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(mqttMessage).variableHeader();
+        verify(ctx, times(2)).channel();
+        verify(retryDriver).unMountPubRel(anyInt(), anyString());
+        verify(sessionLoop).getSession(anyString());
+        verify(pushAction).rollNextByAck(eq(session), anyInt());
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
new file mode 100644
index 0000000..c62d12d
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRecHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRecHandler {
+
+    private MqttPubRecHandler pubRecHandler;
+
+    private final int messageId = 666;
+    private final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
+    private MqttFixedHeader expectedPubRelFixHeader;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        pubRecHandler = new MqttPubRecHandler();
+        FieldUtils.writeDeclaredField(pubRecHandler, "retryDriver", retryDriver, true);
+
+        expectedPubRelFixHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+    }
+
+    @Test
+    public void testDoHandler() throws Exception {
+        when(ctx.channel()).thenReturn(channel);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+        pubRecHandler.doHandler(ctx, mqttMessage, hookResult);
+
+        verify(ctx, times(2)).channel();
+        verify(mqttMessage).variableHeader();
+        verify(retryDriver).unMountPublish(eq(messageId), anyString());
+        verify(retryDriver).mountPubRel(eq(messageId), anyString());
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(retryDriver, ctx, mqttMessage);
+
+        // check qosLevel of flushed pub-rel-mqtt-fixed-header
+        Field testFixedHeader = pubRecHandler.getClass().getDeclaredField("pubRelMqttFixedHeader");
+        testFixedHeader.setAccessible(true);
+        Assert.assertEquals(expectedPubRelFixHeader.toString(), testFixedHeader.get(pubRecHandler).toString());
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
new file mode 100644
index 0000000..dcb3e03
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRelHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRelHandler {
+
+    private MqttPubRelHandler pubRelHandler;
+
+    private final int messageId = 666;
+    private final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
+
+    @Mock
+    private InFlyCache inFlyCache;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        pubRelHandler = new MqttPubRelHandler();
+        FieldUtils.writeDeclaredField(pubRelHandler, "inFlyCache", inFlyCache, true);
+    }
+
+    @Test
+    public void testDoHandler() {
+        when(ctx.channel()).thenReturn(channel);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+        pubRelHandler.doHandler(ctx, mqttMessage, hookResult);
+
+        verify(ctx, times(2)).channel();
+        verify(mqttMessage).variableHeader();
+        verify(inFlyCache).remove(eq(InFlyCache.CacheType.PUB), anyString(), eq(messageId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(inFlyCache, ctx, mqttMessage, hookResult);
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
new file mode 100644
index 0000000..8ab6324
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPublishHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPublishHandler {
+    private final String topicName = "testMqttPub";
+    private final int packetId = 666;
+    private MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topicName, packetId);
+    private MqttPublishHandler publishHandler = new MqttPublishHandler();
+
+    private MqttFixedHeader atMostHeader;
+    private MqttFixedHeader atLeastHeader;
+    private MqttFixedHeader exactlyHeader;
+
+    private MqttPublishMessage atMostPubMessage;
+    private MqttPublishMessage atLeastPubMessage;
+    private MqttPublishMessage exactlyPubMessage;
+
+    private HookResult failHook;
+    private HookResult successHook;
+
+    @Mock
+    private InFlyCache inFlyCache;
+
+    @Mock
+    private ChannelManager channelManager;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        FieldUtils.writeDeclaredField(publishHandler, "inFlyCache", inFlyCache, true);
+        FieldUtils.writeDeclaredField(publishHandler, "channelManager", channelManager, true);
+
+        atMostHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        atMostPubMessage = new MqttPublishMessage(atMostHeader, variableHeader, null);
+        atLeastHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+        atLeastPubMessage = new MqttPublishMessage(atLeastHeader, variableHeader, null);
+        exactlyHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.EXACTLY_ONCE, false, 0);
+        exactlyPubMessage = new MqttPublishMessage(exactlyHeader, variableHeader, null);
+
+        failHook = new HookResult(HookResult.FAIL, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED.byteValue(),
+                Remark.INVALID_PARAM, null);
+        successHook = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(channel).writeAndFlush(any());
+    }
+
+    @Test
+    public void testDoHandlerHookFail() {
+        publishHandler.doHandler(ctx, atMostPubMessage, failHook);
+
+        verify(ctx).channel();
+        verify(channelManager).closeConnect(eq(channel), eq(ChannelCloseFrom.SERVER), eq(Remark.INVALID_PARAM));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerAtMostOnce() {
+        publishHandler.doHandler(ctx, atMostPubMessage, successHook);
+
+        verify(ctx).channel();
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerAtLeastOnce() {
+        publishHandler.doHandler(ctx, atLeastPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(channel).writeAndFlush(any(MqttPubAckMessage.class));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerExactlyOnceCacheHit() {
+        doReturn(true).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+
+        publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerExactlyOnceCacheNotHit() {
+        doReturn(false).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+
+        publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verify(inFlyCache).put(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+}
\ No newline at end of file