You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/11 02:21:05 UTC
[rocketmq-mqtt] branch main updated: code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 1393642 code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)
1393642 is described below
commit 1393642c2397a72077892771883be9d5750611e5
Author: YongXing <xy...@gmail.com>
AuthorDate: Mon Apr 11 10:21:00 2022 +0800
code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler for #22 (#63)
Co-authored-by: AhaThinking <ah...@SK-20211027RHLU.mioffice.cn>
---
.../protocol/mqtt/handler/MqttConnectHandler.java | 10 +-
.../cs/protocol/mqtt/handler/MqttPingHandler.java | 8 --
.../protocol/mqtt/handler/MqttPubAckHandler.java | 4 +-
.../protocol/mqtt/handler/MqttPubCompHandler.java | 12 +-
.../protocol/mqtt/handler/MqttPubRecHandler.java | 14 +-
.../protocol/mqtt/handler/MqttPublishHandler.java | 32 ++---
.../mqtt/handler/TestMqttConnectHandler.java | 135 ++++++++++++++++++
.../mqtt/handler/TestMqttDisconnectHandler.java | 62 +++++++++
.../protocol/mqtt/handler/TestMqttPingHandler.java | 63 +++++++++
.../mqtt/handler/TestMqttPubAckHandler.java | 103 ++++++++++++++
.../mqtt/handler/TestMqttPubCompHandler.java | 92 ++++++++++++
.../mqtt/handler/TestMqttPubRecHandler.java | 102 ++++++++++++++
.../mqtt/handler/TestMqttPubRelHandler.java | 87 ++++++++++++
.../mqtt/handler/TestMqttPublishHandler.java | 154 +++++++++++++++++++++
14 files changed, 816 insertions(+), 62 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index ecf3390..9a09a5d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.slf4j.Logger;
@@ -56,9 +55,6 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
@Resource
private SessionLoop sessionLoop;
- @Resource
- private ConnectConf connectConf;
-
private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
@Override
@@ -73,10 +69,6 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
if (!upstreamHookResult.isSuccess()) {
byte connAckCode = (byte) upstreamHookResult.getSubCode();
MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
- if (mqttConnectReturnCode == null) {
- channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
- return;
- }
channel.writeAndFlush(getMqttConnAckMessage(mqttConnectReturnCode));
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
return;
@@ -84,6 +76,8 @@ public class MqttConnectHandler implements MqttPacketHandler<MqttConnectMessage>
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
+
+ // use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
scheduler.schedule(() -> {
if (!future.isDone()) {
future.complete(null);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
index 7fa1fbd..6a5e42d 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
-
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
@@ -26,23 +25,16 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
-import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import javax.annotation.Resource;
-
-
@Component
public class MqttPingHandler implements MqttPacketHandler<MqttMessage> {
private static Logger logger = LoggerFactory.getLogger(MqttPingHandler.class);
- @Resource
- private ChannelManager channelManager;
-
@Override
public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
MqttFixedHeader mqttFixedHeader =
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
index ce9f2ea..77e4062 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -51,6 +52,7 @@ public class MqttPubAckHandler implements MqttPacketHandler<MqttPubAckMessage> {
public void doHandler(ChannelHandlerContext ctx, MqttPubAckMessage mqttMessage, HookResult upstreamHookResult) {
int messageId = mqttMessage.variableHeader().messageId();
retryDriver.unMountPublish(messageId, ChannelInfo.getId(ctx.channel()));
- pushAction.rollNextByAck(sessionLoop.getSession(ChannelInfo.getId(ctx.channel())), messageId);
+ Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
+ pushAction.rollNextByAck(session, messageId);
}
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
index 27a805c..606027a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
@@ -23,8 +23,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
+import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -39,12 +38,6 @@ public class MqttPubCompHandler implements MqttPacketHandler<MqttMessage> {
@Resource
private RetryDriver retryDriver;
- @Resource
- private InFlyCache inFlyCache;
-
- @Resource
- private MqttMsgId mqttMsgId;
-
@Resource
private PushAction pushAction;
@@ -59,7 +52,8 @@ public class MqttPubCompHandler implements MqttPacketHandler<MqttMessage> {
retryDriver.unMountPubRel(variableHeader.messageId(), ChannelInfo.getId(ctx.channel()));
//The Packet Identifier becomes available for reuse once the Sender has received the PUBCOMP Packet.
- pushAction.rollNextByAck(sessionLoop.getSession(channelId), variableHeader.messageId());
+ Session session = sessionLoop.getSession(channelId);
+ pushAction.rollNextByAck(session, variableHeader.messageId());
}
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
index 258ebf3..49e1dcb 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
-
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
@@ -27,26 +26,19 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-
@Component
public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
+ private final MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
+ MqttQoS.AT_LEAST_ONCE, false, 0);
@Resource
private RetryDriver retryDriver;
- @Resource
- private InFlyCache inFlyCache;
-
- @Resource
- private SessionLoop sessionLoop;
-
@Override
public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
@@ -54,8 +46,6 @@ public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
retryDriver.unMountPublish(variableHeader.messageId(), channelId);
retryDriver.mountPubRel(variableHeader.messageId(), channelId);
- MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
- MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader, variableHeader);
ctx.channel().writeAndFlush(pubRelMqttMessage);
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
index a8d17d2..c5594bc 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
@@ -31,10 +31,8 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -51,13 +49,6 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
@Resource
private ChannelManager channelManager;
- @Resource
- private SessionLoop sessionLoop;
-
- @Resource
- private ConnectConf connectConf;
-
-
@Override
public void doHandler(ChannelHandlerContext ctx,
MqttPublishMessage mqttMessage,
@@ -71,16 +62,11 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
return;
}
- final boolean isQos2Message = isQos2Message(mqttMessage);
- if (isQos2Message) {
- if (inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId())) {
- doResponse(ctx, mqttMessage);
- return;
- }
- }
doResponse(ctx, mqttMessage);
- if (isQos2Message) {
- inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.messageId());
+
+ final boolean isQos2Message = isQos2Message(mqttMessage);
+ if (isQos2Message && !inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, variableHeader.packetId())) {
+ inFlyCache.put(InFlyCache.CacheType.PUB, channelId, variableHeader.packetId());
}
}
@@ -96,19 +82,17 @@ public class MqttPublishHandler implements MqttPacketHandler<MqttPublishMessage>
break;
case AT_LEAST_ONCE:
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false,
- MqttQoS.AT_MOST_ONCE,
- false, 0);
+ MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader
- .from(variableHeader.messageId());
+ .from(variableHeader.packetId());
MqttPubAckMessage pubackMessage = new MqttPubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
ctx.channel().writeAndFlush(pubackMessage);
break;
case EXACTLY_ONCE:
MqttFixedHeader pubrecMqttHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false,
- MqttQoS.AT_MOST_ONCE,
- false, 0);
+ MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader pubrecMessageIdVariableHeader = MqttMessageIdVariableHeader
- .from(variableHeader.messageId());
+ .from(variableHeader.packetId());
MqttMessage pubrecMqttMessage = new MqttMessage(pubrecMqttHeader, pubrecMessageIdVariableHeader);
ctx.channel().writeAndFlush(pubrecMqttMessage);
break;
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
new file mode 100644
index 0000000..4ff35ef
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectPayload;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttConnectHandler;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttConnectHandler {
+ private MqttConnectHandler connectHandler;
+ private MqttConnectMessage connectMessage;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private DefaultChannelManager channelManager;
+
+ @Mock
+ private SessionLoop sessionLoop;
+
+ @Before
+ public void setUp() throws IllegalAccessException {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+ MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(null, 0, false,
+ false, false, 0, false, true, 1);
+ MqttConnectPayload payload = new MqttConnectPayload("testConnHandler", null, (byte[]) null, null, null);
+ connectMessage = new MqttConnectMessage(mqttFixedHeader, variableHeader, payload);
+
+ connectHandler = new MqttConnectHandler();
+ FieldUtils.writeDeclaredField(connectHandler, "channelManager", channelManager, true);
+ FieldUtils.writeDeclaredField(connectHandler, "sessionLoop", sessionLoop, true);
+
+ when(ctx.channel()).thenReturn(channel);
+ }
+
+ @After
+ public void After() {}
+
+ @Test
+ public void testDoHandlerAuthFailed() {
+ HookResult authFailHook = new HookResult(HookResult.FAIL,
+ MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), Remark.AUTH_FAILED, null);
+ doReturn(null).when(channel).writeAndFlush(any());
+ doNothing().when(channelManager).closeConnect(channel, ChannelCloseFrom.SERVER, Remark.AUTH_FAILED);
+
+ connectHandler.doHandler(ctx, connectMessage, authFailHook);
+
+ verify(channel).writeAndFlush(any());
+ verify(channelManager).closeConnect(channel, ChannelCloseFrom.SERVER, Remark.AUTH_FAILED);
+ verifyNoMoreInteractions(channelManager, sessionLoop);
+ }
+
+ @Test
+ public void testDoHandlerChannelInActive() {
+ HookResult hookResult = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+ doReturn(false).when(channel).isActive();
+ doNothing().when(sessionLoop).loadSession(any(), any());
+
+ connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+ // wait scheduler execution
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {}
+
+ verify(sessionLoop).loadSession(any(), any());
+ verifyNoMoreInteractions(channelManager, sessionLoop);
+ }
+
+ @Test
+ public void testDoHandlerSuccess() {
+ HookResult hookResult = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+ doReturn(true).when(channel).isActive();
+ doNothing().when(sessionLoop).loadSession(any(), any());
+
+ connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+ // wait scheduler execution
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {}
+
+ verify(channel).writeAndFlush(any(MqttConnAckMessage.class));
+ verify(sessionLoop).loadSession(any(), any());
+ verifyNoMoreInteractions(channelManager, sessionLoop);
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
new file mode 100644
index 0000000..744dd53
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttDisconnectHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttDisconnectHandler {
+
+ private MqttDisconnectHandler disconnectHandler;
+
+ @Mock
+ private ChannelManager channelManager;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private MqttMessage mqttMessage;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Test
+ public void testDoHandler() throws IllegalAccessException {
+ disconnectHandler = new MqttDisconnectHandler();
+ FieldUtils.writeDeclaredField(disconnectHandler, "channelManager", channelManager, true);
+
+ disconnectHandler.doHandler(ctx, mqttMessage, hookResult);
+ verify(channelManager).closeConnect(any(), any(), any());
+ verifyNoMoreInteractions(channelManager);
+ }
+}
\ No newline at end of file
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
new file mode 100644
index 0000000..3b3037c
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPingHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPingHandler {
+
+ private MqttPingHandler pingHandler;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private MqttMessage mqttMessage;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Test
+ public void testDoHandler() {
+ pingHandler = new MqttPingHandler();
+ when(ctx.channel()).thenReturn(channel);
+ doReturn(null).when(channel).writeAndFlush(any());
+
+ pingHandler.doHandler(ctx, mqttMessage, hookResult);
+ verify(ctx).channel();
+ verify(channel).writeAndFlush(any());
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
new file mode 100644
index 0000000..646d835
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubAckHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubAckHandler {
+ private MqttPubAckHandler pubAckHandler;
+ private MqttMessageIdVariableHeader variableHeader;
+ private MqttPubAckMessage pubAckMessage;
+ private MqttFixedHeader fixedHeader;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Mock
+ private PushAction pushAction;
+
+ @Mock
+ private RetryDriver retryDriver;
+
+ @Mock
+ private SessionLoop sessionLoop;
+
+ @Mock
+ private Session session;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Before
+ public void setUp() throws Exception {
+ variableHeader = MqttMessageIdVariableHeader.from(007);
+ fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+ pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);
+ }
+
+ @Test
+ public void testDoHandler() throws IllegalAccessException {
+ pubAckHandler = new MqttPubAckHandler();
+ FieldUtils.writeDeclaredField(pubAckHandler, "pushAction", pushAction, true);
+ FieldUtils.writeDeclaredField(pubAckHandler, "retryDriver", retryDriver, true);
+ FieldUtils.writeDeclaredField(pubAckHandler, "sessionLoop", sessionLoop, true);
+
+ when(ctx.channel()).thenReturn(channel);
+ doReturn(null).when(retryDriver).unMountPublish(anyInt(), anyString());
+ doReturn(session).when(sessionLoop).getSession(anyString());
+ doNothing().when(pushAction).rollNextByAck(any(), anyInt());
+
+ pubAckHandler.doHandler(ctx, pubAckMessage, hookResult);
+ verify(ctx, times(2)).channel();
+ verify(sessionLoop).getSession(anyString());
+ verify(pushAction).rollNextByAck(eq(session), anyInt());
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
new file mode 100644
index 0000000..a500709
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubCompHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubCompHandler {
+ private MqttPubCompHandler pubCompHandler;
+
+ @Mock
+ private RetryDriver retryDriver;
+
+ @Mock
+ private PushAction pushAction;
+
+ @Mock
+ private SessionLoop sessionLoop;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private MqttMessage mqttMessage;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Mock
+ private Session session;
+
+ @Test
+ public void testDoHandler() throws IllegalAccessException {
+ pubCompHandler = new MqttPubCompHandler();
+ FieldUtils.writeDeclaredField(pubCompHandler, "retryDriver", retryDriver, true);
+ FieldUtils.writeDeclaredField(pubCompHandler, "pushAction", pushAction, true);
+ FieldUtils.writeDeclaredField(pubCompHandler, "sessionLoop", sessionLoop, true);
+
+ MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(666);
+ when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+ when(ctx.channel()).thenReturn(channel);
+ when(sessionLoop.getSession(any())).thenReturn(session);
+
+ pubCompHandler.doHandler(ctx, mqttMessage, hookResult);
+ verify(mqttMessage).variableHeader();
+ verify(ctx, times(2)).channel();
+ verify(retryDriver).unMountPubRel(anyInt(), anyString());
+ verify(sessionLoop).getSession(anyString());
+ verify(pushAction).rollNextByAck(eq(session), anyInt());
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
new file mode 100644
index 0000000..c62d12d
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRecHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRecHandler {
+
+ private MqttPubRecHandler pubRecHandler;
+
+ private final int messageId = 666;
+ private final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
+ private MqttFixedHeader expectedPubRelFixHeader;
+
+ @Mock
+ private RetryDriver retryDriver;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private MqttMessage mqttMessage;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Before
+ public void setUp() throws Exception {
+ pubRecHandler = new MqttPubRecHandler();
+ FieldUtils.writeDeclaredField(pubRecHandler, "retryDriver", retryDriver, true);
+
+ expectedPubRelFixHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ }
+
+ @Test
+ public void testDoHandler() throws Exception {
+ when(ctx.channel()).thenReturn(channel);
+ when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+ doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+ pubRecHandler.doHandler(ctx, mqttMessage, hookResult);
+
+ verify(ctx, times(2)).channel();
+ verify(mqttMessage).variableHeader();
+ verify(retryDriver).unMountPublish(eq(messageId), anyString());
+ verify(retryDriver).mountPubRel(eq(messageId), anyString());
+ verify(channel).writeAndFlush(any(MqttMessage.class));
+ verifyNoMoreInteractions(retryDriver, ctx, mqttMessage);
+
+ // check qosLevel of flushed pub-rel-mqtt-fixed-header
+ Field testFixedHeader = pubRecHandler.getClass().getDeclaredField("pubRelMqttFixedHeader");
+ testFixedHeader.setAccessible(true);
+ Assert.assertEquals(expectedPubRelFixHeader.toString(), testFixedHeader.get(pubRecHandler).toString());
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
new file mode 100644
index 0000000..dcb3e03
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRelHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRelHandler {
+
+ private MqttPubRelHandler pubRelHandler;
+
+ private final int messageId = 666;
+ private final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
+
+ @Mock
+ private InFlyCache inFlyCache;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Mock
+ private MqttMessage mqttMessage;
+
+ @Mock
+ private HookResult hookResult;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Before
+ public void setUp() throws Exception {
+ pubRelHandler = new MqttPubRelHandler();
+ FieldUtils.writeDeclaredField(pubRelHandler, "inFlyCache", inFlyCache, true);
+ }
+
+ @Test
+ public void testDoHandler() {
+ when(ctx.channel()).thenReturn(channel);
+ when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+ doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+ pubRelHandler.doHandler(ctx, mqttMessage, hookResult);
+
+ verify(ctx, times(2)).channel();
+ verify(mqttMessage).variableHeader();
+ verify(inFlyCache).remove(eq(InFlyCache.CacheType.PUB), anyString(), eq(messageId));
+ verify(channel).writeAndFlush(any(MqttMessage.class));
+ verifyNoMoreInteractions(inFlyCache, ctx, mqttMessage, hookResult);
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
new file mode 100644
index 0000000..8ab6324
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPublishHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPublishHandler {
+ private final String topicName = "testMqttPub";
+ private final int packetId = 666;
+ private MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topicName, packetId);
+ private MqttPublishHandler publishHandler = new MqttPublishHandler();
+
+ private MqttFixedHeader atMostHeader;
+ private MqttFixedHeader atLeastHeader;
+ private MqttFixedHeader exactlyHeader;
+
+ private MqttPublishMessage atMostPubMessage;
+ private MqttPublishMessage atLeastPubMessage;
+ private MqttPublishMessage exactlyPubMessage;
+
+ private HookResult failHook;
+ private HookResult successHook;
+
+ @Mock
+ private InFlyCache inFlyCache;
+
+ @Mock
+ private ChannelManager channelManager;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Before
+ public void setUp() throws Exception {
+ FieldUtils.writeDeclaredField(publishHandler, "inFlyCache", inFlyCache, true);
+ FieldUtils.writeDeclaredField(publishHandler, "channelManager", channelManager, true);
+
+ atMostHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0);
+ atMostPubMessage = new MqttPublishMessage(atMostHeader, variableHeader, null);
+ atLeastHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ atLeastPubMessage = new MqttPublishMessage(atLeastHeader, variableHeader, null);
+ exactlyHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.EXACTLY_ONCE, false, 0);
+ exactlyPubMessage = new MqttPublishMessage(exactlyHeader, variableHeader, null);
+
+ failHook = new HookResult(HookResult.FAIL, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED.byteValue(),
+ Remark.INVALID_PARAM, null);
+ successHook = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+
+ when(ctx.channel()).thenReturn(channel);
+ doReturn(null).when(channel).writeAndFlush(any());
+ }
+
+ @Test
+ public void testDoHandlerHookFail() {
+ publishHandler.doHandler(ctx, atMostPubMessage, failHook);
+
+ verify(ctx).channel();
+ verify(channelManager).closeConnect(eq(channel), eq(ChannelCloseFrom.SERVER), eq(Remark.INVALID_PARAM));
+ verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+ }
+
+ @Test
+ public void testDoHandlerAtMostOnce() {
+ publishHandler.doHandler(ctx, atMostPubMessage, successHook);
+
+ verify(ctx).channel();
+ verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+ }
+
+ @Test
+ public void testDoHandlerAtLeastOnce() {
+ publishHandler.doHandler(ctx, atLeastPubMessage, successHook);
+
+ verify(ctx, times(2)).channel();
+ verify(channel).writeAndFlush(any(MqttPubAckMessage.class));
+ verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+ }
+
+ @Test
+ public void testDoHandlerExactlyOnceCacheHit() {
+ doReturn(true).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+
+ publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+ verify(ctx, times(2)).channel();
+ verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+ verify(channel).writeAndFlush(any(MqttMessage.class));
+ verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+ }
+
+ @Test
+ public void testDoHandlerExactlyOnceCacheNotHit() {
+ doReturn(false).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+
+ publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+ verify(ctx, times(2)).channel();
+ verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+ verify(channel).writeAndFlush(any(MqttMessage.class));
+ verify(inFlyCache).put(eq(InFlyCache.CacheType.PUB), anyString(), eq(packetId));
+ verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+ }
+
+}
\ No newline at end of file