You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/06 06:50:17 UTC
[rocketmq-mqtt] branch main updated: make codeCov of mqtt.cs.channel more than 80% (#45)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new 4084f86 make codeCov of mqtt.cs.channel more than 80% (#45)
4084f86 is described below
commit 4084f8624b24a8317f381bdd395230b8b9c6c41b
Author: YongXing <xy...@gmail.com>
AuthorDate: Wed Apr 6 14:50:14 2022 +0800
make codeCov of mqtt.cs.channel more than 80% (#45)
Co-authored-by: YongXing [勇幸] <yo...@xiaomi.com>
---
.../rocketmq/mqtt/cs/channel/ChannelInfo.java | 12 +-
.../rocketmq/mqtt/cs/channel/ConnectHandler.java | 3 +-
.../mqtt/cs/channel/DefaultChannelManager.java | 20 ++--
.../mqtt/cs/test/channel/TestChannelInfo.java | 103 ++++++++++++++++
.../mqtt/cs/test/channel/TestConnectHandler.java | 74 ++++++++++++
.../cs/test/channel/TestDefaultChannelManager.java | 129 +++++++++++++++++++++
6 files changed, 317 insertions(+), 24 deletions(-)
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
index 9521f32..4e600af 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
@@ -93,11 +93,7 @@ public class ChannelInfo {
if (!getInfo(channel).containsKey(CHANNEL_EXT_CHANGE_KEY)) {
getInfo(channel).put(CHANNEL_EXT_CHANGE_KEY, false);
}
- Object obj = getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
- if (obj == null) {
- return false;
- }
- return (boolean)obj;
+ return (boolean) getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
}
public static String getId(Channel channel) {
@@ -114,11 +110,7 @@ public class ChannelInfo {
if (!getInfo(channel).containsKey(CHANNEL_CLEAN_SESSION_KEY)) {
getInfo(channel).put(CHANNEL_CLEAN_SESSION_KEY, true);
}
- Object obj = getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
- if (obj == null) {
- return true;
- }
- return (Boolean)obj;
+ return (Boolean) getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
}
public static void setCleanSessionFlag(Channel channel, Boolean cleanSessionFalg) {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
index 318a951..a891d0a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
@@ -53,8 +53,7 @@ public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause.getMessage() != null && simpleExceptions.contains(cause.getMessage())) {
- } else {
+ if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {
logger.error("exceptionCaught {}", ctx.channel(), cause);
}
channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
index 1caddbc..ee2e8b8 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
@@ -98,7 +98,7 @@ public class DefaultChannelManager implements ChannelManager {
TimeUnit.SECONDS);
}
} catch (Exception e) {
- logger.error("", e);
+ logger.error("Exception when doPing: ", e);
}
}
@@ -109,22 +109,18 @@ public class DefaultChannelManager implements ChannelManager {
if (clientId == null) {
channelMap.remove(channelId);
sessionLoop.unloadSession(clientId, channelId);
- if (channel.isActive()) {
- channel.close();
- }
- return;
+ } else {
+ //session maybe null
+ Session session = sessionLoop.unloadSession(clientId, channelId);
+ retryDriver.unloadSession(session);
+ channelMap.remove(channelId);
+ ChannelInfo.clear(channel);
}
- //session maybe null
- Session session = sessionLoop.unloadSession(clientId, channelId);
- retryDriver.unloadSession(session);
- channelMap.remove(channelId);
-
- ChannelInfo.clear(channel);
-
if (channel.isActive()) {
channel.close();
}
+ logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
}
@Override
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java
new file mode 100644
index 0000000..ebf9f96
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class TestChannelInfo {
+
+ @Test
+ public void test() {
+ NioSocketChannel channel = new NioSocketChannel();
+ String extDataStr = "{\"test\":\"extData\"}";
+
+ // test 'update/check/get/encode' of 'ExtData'
+ Assert.assertFalse(ChannelInfo.updateExtData(channel, ""));
+ Assert.assertFalse(ChannelInfo.checkExtDataChange(channel));
+ Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());
+ // update 'ExtData'
+ Assert.assertTrue(ChannelInfo.updateExtData(channel, extDataStr));
+ Assert.assertTrue(ChannelInfo.checkExtDataChange(channel));
+ Assert.assertEquals(1, ChannelInfo.getExtData(channel).size());
+ Assert.assertEquals(extDataStr, ChannelInfo.encodeExtData(channel));
+
+ // test 'getId'
+ Assert.assertNotNull(ChannelInfo.getId(channel));
+
+ // test 'set/get CleanSessionFlag'
+ ChannelInfo.setCleanSessionFlag(channel, Boolean.FALSE);
+ Assert.assertFalse(ChannelInfo.getCleanSessionFlag(channel));
+
+ // test 'set/get ClientId'
+ String clientId = "testExtData";
+ ChannelInfo.setClientId(channel, clientId);
+ Assert.assertEquals(clientId, ChannelInfo.getClientId(channel));
+
+ // test 'set/get ChannelLifeCycle'
+ ChannelInfo.setChannelLifeCycle(channel, System.currentTimeMillis());
+ Assert.assertNotEquals(Long.MAX_VALUE, ChannelInfo.getChannelLifeCycle(channel));
+
+ // test 'set/get/remove Future'
+ String futureKey = "futureKey";
+ ChannelInfo.setFuture(channel, futureKey, new CompletableFuture<>());
+ Assert.assertNotNull(ChannelInfo.getFuture(channel, futureKey));
+ ChannelInfo.removeFuture(channel, futureKey);
+ Assert.assertNull(ChannelInfo.getFuture(channel, futureKey));
+
+ // test 'touch/getLastTouch'
+ Assert.assertEquals(0, ChannelInfo.getLastTouch(channel));
+ ChannelInfo.touch(channel);
+ Assert.assertNotEquals(0, ChannelInfo.getLastTouch(channel));
+
+ // test 'lastActive/getLastActive'
+ ChannelInfo.lastActive(channel, System.currentTimeMillis());
+ Assert.assertNotEquals(0, ChannelInfo.getLastActive(channel));
+
+ // test 'set/get RemoteIP'
+ ChannelInfo.setRemoteIP(channel, "");
+ Assert.assertEquals("", ChannelInfo.getRemoteIP(channel));
+
+ // test 'set/get KeepLive/isExpired'
+ Assert.assertTrue(ChannelInfo.isExpired(channel));
+ ChannelInfo.setKeepLive(channel, -1);
+ Assert.assertTrue(ChannelInfo.isExpired(channel));
+
+ // test 'set/get Owner/Namespace'
+ String ownerNamespc = "channelInfo";
+ ChannelInfo.setOwner(channel, ownerNamespc);
+ ChannelInfo.setNamespace(channel, ownerNamespc);
+ Assert.assertEquals(ownerNamespc, ChannelInfo.getOwner(channel));
+ Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));
+
+ // test 'clear'
+ ChannelInfo.clear(channel);
+ Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));
+ Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());
+
+ if (channel.isActive()) {
+ channel.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java
new file mode 100644
index 0000000..5d9216a
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestConnectHandler {
+
+ private ConnectHandler connectHandler;
+
+ @Mock
+ private ChannelManager channelManager;
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Before
+ public void Before() throws IllegalAccessException {
+ connectHandler = new ConnectHandler();
+ FieldUtils.writeDeclaredField(connectHandler, "channelManager", channelManager, true);
+ }
+
+ @After
+ public void After() {
+ }
+
+ @Test
+ public void testChannelActive() throws Exception {
+ connectHandler.channelActive(ctx);
+ verify(channelManager).addChannel(any());
+ }
+
+ @Test
+ public void testChannelInactive() throws Exception {
+ connectHandler.channelInactive(ctx);
+ verify(channelManager).closeConnect(any(), any(), any());
+ }
+
+ @Test
+ public void testExceptionCaught() throws Exception {
+ connectHandler.exceptionCaught(ctx, new Throwable("err"));
+ verify(channelManager).closeConnect(any(), any(), any());
+ }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java
new file mode 100644
index 0000000..f854652
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
+import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDefaultChannelManager {
+ private DefaultChannelManager defaultChannelManager;
+ private final String clientId = "clientId";
+ private final String channelId = "channelId";
+
+ @Mock
+ private SessionLoop sessionLoop;
+
+ @Mock
+ private ConnectConf connectConf;
+
+ @Mock
+ private RetryDriver retryDriver;
+
+ @Spy
+ private NioSocketChannel channel;
+
+ @Before
+ public void Before() throws IllegalAccessException {
+ defaultChannelManager = new DefaultChannelManager();
+ FieldUtils.writeDeclaredField(defaultChannelManager, "sessionLoop", sessionLoop, true);
+ FieldUtils.writeDeclaredField(defaultChannelManager, "connectConf", connectConf, true);
+ FieldUtils.writeDeclaredField(defaultChannelManager, "retryDriver", retryDriver, true);
+ FieldUtils.writeStaticField(DefaultChannelManager.class, "minBlankChannelSeconds", 0, true);
+ defaultChannelManager.init();
+ }
+
+ @After
+ public void After() {
+ if (channel.isActive()) {
+ channel.close();
+ }
+ }
+
+ @Test
+ public void testAddChannel() {
+ ChannelInfo.setClientId(channel, clientId);
+ ChannelInfo.setChannelLifeCycle(channel, 1000L);
+ defaultChannelManager.addChannel(channel);
+
+ // waiting the execution of the 'doPing' TimerTask
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {}
+
+ // verify 'doPing' and 'closeConnect'
+ verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
+ verify(retryDriver).unloadSession(Mockito.any());
+ }
+
+ @Test
+ public void testCloseConnectNullClientId() {
+ defaultChannelManager.closeConnect(channel, ChannelCloseFrom.CLIENT, "ForTest");
+ verify(sessionLoop).unloadSession(Mockito.isNull(), anyString());
+ }
+
+ @Test
+ public void testCloseConnect() {
+ ChannelInfo.setClientId(channel, clientId);
+ defaultChannelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ForTest");
+ verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
+ verify(retryDriver).unloadSession(Mockito.any());
+ }
+
+ @Test
+ public void testCloseConnectNoFrom() throws IllegalAccessException {
+ defaultChannelManager.closeConnect(channelId, "ForTest");
+ Object channelMap = FieldUtils.readDeclaredField(defaultChannelManager, "channelMap", true);
+ Assert.assertEquals(0, ((Map<String, Channel>) channelMap).size());
+ }
+
+ @Test
+ public void testGetChannelById() {
+ Assert.assertNull(defaultChannelManager.getChannelById(channelId));
+ }
+
+ @Test
+ public void testTotalConn() {
+ Assert.assertEquals(0, defaultChannelManager.totalConn());
+ defaultChannelManager.addChannel(channel);
+ Assert.assertEquals(1, defaultChannelManager.totalConn());
+ }
+}
\ No newline at end of file