You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/06 06:50:17 UTC

[rocketmq-mqtt] branch main updated: make codeCov of mqtt.cs.channel more than 80% (#45)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 4084f86  make codeCov of mqtt.cs.channel more than 80% (#45)
4084f86 is described below

commit 4084f8624b24a8317f381bdd395230b8b9c6c41b
Author: YongXing <xy...@gmail.com>
AuthorDate: Wed Apr 6 14:50:14 2022 +0800

    make codeCov of mqtt.cs.channel more than 80% (#45)
    
    Co-authored-by: YongXing [勇幸] <yo...@xiaomi.com>
---
 .../rocketmq/mqtt/cs/channel/ChannelInfo.java      |  12 +-
 .../rocketmq/mqtt/cs/channel/ConnectHandler.java   |   3 +-
 .../mqtt/cs/channel/DefaultChannelManager.java     |  20 ++--
 .../mqtt/cs/test/channel/TestChannelInfo.java      | 103 ++++++++++++++++
 .../mqtt/cs/test/channel/TestConnectHandler.java   |  74 ++++++++++++
 .../cs/test/channel/TestDefaultChannelManager.java | 129 +++++++++++++++++++++
 6 files changed, 317 insertions(+), 24 deletions(-)

diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
index 9521f32..4e600af 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ChannelInfo.java
@@ -93,11 +93,7 @@ public class ChannelInfo {
         if (!getInfo(channel).containsKey(CHANNEL_EXT_CHANGE_KEY)) {
             getInfo(channel).put(CHANNEL_EXT_CHANGE_KEY, false);
         }
-        Object obj = getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
-        if (obj == null) {
-            return false;
-        }
-        return (boolean)obj;
+        return (boolean) getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
     }
 
     public static String getId(Channel channel) {
@@ -114,11 +110,7 @@ public class ChannelInfo {
         if (!getInfo(channel).containsKey(CHANNEL_CLEAN_SESSION_KEY)) {
             getInfo(channel).put(CHANNEL_CLEAN_SESSION_KEY, true);
         }
-        Object obj = getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
-        if (obj == null) {
-            return true;
-        }
-        return (Boolean)obj;
+        return (Boolean) getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
     }
 
     public static void setCleanSessionFlag(Channel channel, Boolean cleanSessionFalg) {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
index 318a951..a891d0a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/ConnectHandler.java
@@ -53,8 +53,7 @@ public class ConnectHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        if (cause.getMessage() != null && simpleExceptions.contains(cause.getMessage())) {
-        } else {
+        if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {
             logger.error("exceptionCaught {}", ctx.channel(), cause);
         }
         channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
index 1caddbc..ee2e8b8 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
@@ -98,7 +98,7 @@ public class DefaultChannelManager implements ChannelManager {
                     TimeUnit.SECONDS);
             }
         } catch (Exception e) {
-            logger.error("", e);
+            logger.error("Exception when doPing: ", e);
         }
     }
 
@@ -109,22 +109,18 @@ public class DefaultChannelManager implements ChannelManager {
         if (clientId == null) {
             channelMap.remove(channelId);
             sessionLoop.unloadSession(clientId, channelId);
-            if (channel.isActive()) {
-                channel.close();
-            }
-            return;
+        } else {
+            //session maybe null
+            Session session = sessionLoop.unloadSession(clientId, channelId);
+            retryDriver.unloadSession(session);
+            channelMap.remove(channelId);
+            ChannelInfo.clear(channel);
         }
 
-        //session maybe null
-        Session session = sessionLoop.unloadSession(clientId, channelId);
-        retryDriver.unloadSession(session);
-        channelMap.remove(channelId);
-
-        ChannelInfo.clear(channel);
-
         if (channel.isActive()) {
             channel.close();
         }
+        logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
     }
 
     @Override
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java
new file mode 100644
index 0000000..ebf9f96
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestChannelInfo.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class TestChannelInfo {
+
+    @Test
+    public void test() {
+        NioSocketChannel channel = new NioSocketChannel();
+        String extDataStr = "{\"test\":\"extData\"}";
+
+        // test 'update/check/get/encode' of 'ExtData'
+        Assert.assertFalse(ChannelInfo.updateExtData(channel, ""));
+        Assert.assertFalse(ChannelInfo.checkExtDataChange(channel));
+        Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());
+        // update 'ExtData'
+        Assert.assertTrue(ChannelInfo.updateExtData(channel, extDataStr));
+        Assert.assertTrue(ChannelInfo.checkExtDataChange(channel));
+        Assert.assertEquals(1, ChannelInfo.getExtData(channel).size());
+        Assert.assertEquals(extDataStr, ChannelInfo.encodeExtData(channel));
+
+        // test 'getId'
+        Assert.assertNotNull(ChannelInfo.getId(channel));
+
+        // test 'set/get CleanSessionFlag'
+        ChannelInfo.setCleanSessionFlag(channel, Boolean.FALSE);
+        Assert.assertFalse(ChannelInfo.getCleanSessionFlag(channel));
+
+        // test 'set/get ClientId'
+        String clientId = "testExtData";
+        ChannelInfo.setClientId(channel, clientId);
+        Assert.assertEquals(clientId, ChannelInfo.getClientId(channel));
+
+        // test 'set/get ChannelLifeCycle'
+        ChannelInfo.setChannelLifeCycle(channel, System.currentTimeMillis());
+        Assert.assertNotEquals(Long.MAX_VALUE, ChannelInfo.getChannelLifeCycle(channel));
+
+        // test 'set/get/remove Future'
+        String futureKey = "futureKey";
+        ChannelInfo.setFuture(channel, futureKey, new CompletableFuture<>());
+        Assert.assertNotNull(ChannelInfo.getFuture(channel, futureKey));
+        ChannelInfo.removeFuture(channel, futureKey);
+        Assert.assertNull(ChannelInfo.getFuture(channel, futureKey));
+
+        // test 'touch/getLastTouch'
+        Assert.assertEquals(0, ChannelInfo.getLastTouch(channel));
+        ChannelInfo.touch(channel);
+        Assert.assertNotEquals(0, ChannelInfo.getLastTouch(channel));
+
+        // test 'lastActive/getLastActive'
+        ChannelInfo.lastActive(channel, System.currentTimeMillis());
+        Assert.assertNotEquals(0, ChannelInfo.getLastActive(channel));
+
+        // test 'set/get RemoteIP'
+        ChannelInfo.setRemoteIP(channel, "");
+        Assert.assertEquals("", ChannelInfo.getRemoteIP(channel));
+
+        // test 'set/get KeepLive/isExpired'
+        Assert.assertTrue(ChannelInfo.isExpired(channel));
+        ChannelInfo.setKeepLive(channel, -1);
+        Assert.assertTrue(ChannelInfo.isExpired(channel));
+
+        // test 'set/get Owner/Namespace'
+        String ownerNamespc = "channelInfo";
+        ChannelInfo.setOwner(channel, ownerNamespc);
+        ChannelInfo.setNamespace(channel, ownerNamespc);
+        Assert.assertEquals(ownerNamespc, ChannelInfo.getOwner(channel));
+        Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));
+
+        // test 'clear'
+        ChannelInfo.clear(channel);
+        Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));
+        Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());
+
+        if (channel.isActive()) {
+            channel.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java
new file mode 100644
index 0000000..5d9216a
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestConnectHandler.java
@@ -0,0 +1,74 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestConnectHandler {
+
+    private ConnectHandler connectHandler;
+
+    @Mock
+    private ChannelManager channelManager;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Before
+    public void Before() throws IllegalAccessException {
+        connectHandler = new ConnectHandler();
+        FieldUtils.writeDeclaredField(connectHandler, "channelManager", channelManager, true);
+    }
+
+    @After
+    public void After() {
+    }
+
+    @Test
+    public void testChannelActive() throws Exception {
+        connectHandler.channelActive(ctx);
+        verify(channelManager).addChannel(any());
+    }
+
+    @Test
+    public void testChannelInactive() throws Exception {
+        connectHandler.channelInactive(ctx);
+        verify(channelManager).closeConnect(any(), any(), any());
+    }
+
+    @Test
+    public void testExceptionCaught() throws Exception {
+        connectHandler.exceptionCaught(ctx, new Throwable("err"));
+        verify(channelManager).closeConnect(any(), any(), any());
+    }
+}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java
new file mode 100644
index 0000000..f854652
--- /dev/null
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/channel/TestDefaultChannelManager.java
@@ -0,0 +1,129 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.channel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
+import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDefaultChannelManager {
+    private DefaultChannelManager defaultChannelManager;
+    private final String clientId = "clientId";
+    private final String channelId = "channelId";
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Mock
+    private ConnectConf connectConf;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void Before() throws IllegalAccessException {
+        defaultChannelManager = new DefaultChannelManager();
+        FieldUtils.writeDeclaredField(defaultChannelManager, "sessionLoop", sessionLoop, true);
+        FieldUtils.writeDeclaredField(defaultChannelManager, "connectConf", connectConf, true);
+        FieldUtils.writeDeclaredField(defaultChannelManager, "retryDriver", retryDriver, true);
+        FieldUtils.writeStaticField(DefaultChannelManager.class, "minBlankChannelSeconds", 0, true);
+        defaultChannelManager.init();
+    }
+
+    @After
+    public void After() {
+        if (channel.isActive()) {
+            channel.close();
+        }
+    }
+
+    @Test
+    public void testAddChannel() {
+        ChannelInfo.setClientId(channel, clientId);
+        ChannelInfo.setChannelLifeCycle(channel, 1000L);
+        defaultChannelManager.addChannel(channel);
+
+        // waiting the execution of the 'doPing' TimerTask
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException ignored) {}
+
+        // verify 'doPing' and 'closeConnect'
+        verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
+        verify(retryDriver).unloadSession(Mockito.any());
+    }
+
+    @Test
+    public void testCloseConnectNullClientId() {
+        defaultChannelManager.closeConnect(channel, ChannelCloseFrom.CLIENT, "ForTest");
+        verify(sessionLoop).unloadSession(Mockito.isNull(), anyString());
+    }
+
+    @Test
+    public void testCloseConnect() {
+        ChannelInfo.setClientId(channel, clientId);
+        defaultChannelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ForTest");
+        verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
+        verify(retryDriver).unloadSession(Mockito.any());
+    }
+
+    @Test
+    public void testCloseConnectNoFrom() throws IllegalAccessException {
+        defaultChannelManager.closeConnect(channelId, "ForTest");
+        Object channelMap = FieldUtils.readDeclaredField(defaultChannelManager, "channelMap", true);
+        Assert.assertEquals(0, ((Map<String, Channel>) channelMap).size());
+    }
+
+    @Test
+    public void testGetChannelById() {
+        Assert.assertNull(defaultChannelManager.getChannelById(channelId));
+    }
+
+    @Test
+    public void testTotalConn() {
+        Assert.assertEquals(0, defaultChannelManager.totalConn());
+        defaultChannelManager.addChannel(channel);
+        Assert.assertEquals(1, defaultChannelManager.totalConn());
+    }
+}
\ No newline at end of file