You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/24 03:12:50 UTC

[rocketmq-mqtt] branch main updated: Define a subscription storage interface (#79)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new e0b5e20  Define a subscription storage interface (#79)
e0b5e20 is described below

commit e0b5e20abbafb1369e54c80146e9e4190ca8f853
Author: pingww <pi...@gmail.com>
AuthorDate: Sun Apr 24 11:12:46 2022 +0800

    Define a subscription storage interface (#79)
---
 mqtt-common/pom.xml                                |  4 ++
 .../common/facade/SubscriptionPersistManager.java  | 47 ++++++++-----
 .../rocketmq/mqtt/common/util/SpringUtils.java     | 21 +++---
 .../mqtt/cs/session/loop/SessionLoopImpl.java      | 29 +++++++++
 .../apache/rocketmq/mqtt/cs/starter/Startup.java   |  3 +-
 .../rocketmq/mqtt/cs/test/TestSessionLoopImpl.java | 18 +++++
 .../ds/upstream/processor/SubscribeProcessor.java  | 18 +++++
 .../upstream/processor/UnSubscribeProcessor.java   | 17 +++++
 .../mqtt/ds/test/TestSubscribeProcessor.java       | 73 +++++++++++++++++++++
 .../mqtt/ds/test/TestUnSubscribeProcessor.java     | 76 ++++++++++++++++++++++
 10 files changed, 279 insertions(+), 27 deletions(-)

diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index f5c75b4..1cf838b 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -46,5 +46,9 @@
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
similarity index 52%
copy from mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
index 8d38042..ff49d5a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
@@ -15,20 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.mqtt.cs.starter;
-
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-
-public class Startup {
-
-    public static void main(String[] args) {
-        System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
-
-        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
-        System.out.println("start rocketmq mqtt  ...");
-    }
-
+package org.apache.rocketmq.mqtt.common.facade;
+
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public interface SubscriptionPersistManager {
+    /**
+     * loadSubscriptions
+     *
+     * @param clientId
+     * @return
+     */
+    CompletableFuture<Set<Subscription>> loadSubscriptions(String clientId);
+
+    /**
+     * saveSubscriptions
+     *
+     * @param clientId
+     * @param subscriptions
+     */
+    void saveSubscriptions(String clientId, Set<Subscription> subscriptions);
+
+    /**
+     * removeSubscriptions
+     *
+     * @param clientId
+     * @param subscriptions
+     */
+    void removeSubscriptions(String clientId, Set<Subscription> subscriptions);
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
similarity index 64%
copy from mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
index 8d38042..6cbfa49 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
@@ -15,20 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.mqtt.cs.starter;
+package org.apache.rocketmq.mqtt.common.util;
 
-import org.apache.rocketmq.client.log.ClientLogger;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
+public class SpringUtils {
+    private static ClassPathXmlApplicationContext applicationContext;
 
-public class Startup {
-
-    public static void main(String[] args) {
-        System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
-
-        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
-        System.out.println("start rocketmq mqtt  ...");
+    public static void SetClassPathXmlApplicationContext(ClassPathXmlApplicationContext applicationContext) {
+        SpringUtils.applicationContext = applicationContext;
     }
 
+    public static <T> T getBean(Class<T> type) {
+        if (applicationContext == null) {
+            return null;
+        }
+        return applicationContext.getBean(type);
+    }
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 5ee3761..0dd0a01 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -23,10 +23,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.model.PullResult;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
@@ -88,6 +90,7 @@ public class SessionLoopImpl implements SessionLoop {
     private ScheduledThreadPoolExecutor pullService;
     private ScheduledThreadPoolExecutor scheduler;
     private ScheduledThreadPoolExecutor persistOffsetScheduler;
+    private SubscriptionPersistManager subscriptionPersistManager;
 
     /**
      * channelId->session
@@ -336,6 +339,32 @@ public class SessionLoopImpl implements SessionLoop {
         if (session == null || session.isDestroyed()) {
             return;
         }
+        if (subscriptionPersistManager == null) {
+            subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+        }
+        if (subscriptionPersistManager != null &&
+                !session.isClean() && !session.isLoaded()) {
+            if (session.isLoading()) {
+                return;
+            }
+            session.setLoading();
+            CompletableFuture<Set<Subscription>> future = subscriptionPersistManager.loadSubscriptions(session.getClientId());
+            future.whenComplete((subscriptions, throwable) -> {
+                if (throwable != null) {
+                    logger.error("", throwable);
+                    scheduler.schedule(() -> {
+                        session.resetLoad();
+                        notifyPullMessage(session, subscription, queue);
+                    }, 3, TimeUnit.SECONDS);
+                    return;
+                }
+                session.addSubscription(subscriptions);
+                matchAction.addSubscription(session, subscriptions);
+                session.setLoaded();
+                notifyPullMessage(session, subscription, queue);
+            });
+            return;
+        }
         if (queue != null) {
             if (subscription == null) {
                 throw new RuntimeException(
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
index 8d38042..7fa680f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.mqtt.cs.starter;
 
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 
@@ -27,7 +28,7 @@ public class Startup {
         System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
 
         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
+        SpringUtils.SetClassPathXmlApplicationContext(applicationContext);
         System.out.println("start rocketmq mqtt  ...");
     }
 
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
index c0d4c43..32bdd7e 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
@@ -23,6 +23,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -46,6 +47,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -165,4 +167,20 @@ public class TestSessionLoopImpl {
         verify(queueCache, atLeastOnce()).pullMessage(any(), any(), any(), any(), anyInt(), any());
 
     }
+
+    @Test
+    public void testLoadSubscription() throws IllegalAccessException, InterruptedException {
+        SubscriptionPersistManager subscriptionPersistManager = mock(SubscriptionPersistManager.class);
+        FieldUtils.writeDeclaredField(sessionLoop, "subscriptionPersistManager", subscriptionPersistManager, true);
+        SessionLoopImpl spySessionLoop = spy(sessionLoop);
+        CompletableFuture<Set<Subscription>> loadResult = new CompletableFuture();
+        loadResult.complete(new HashSet<>(Arrays.asList(new Subscription("test"))));
+        when(subscriptionPersistManager.loadSubscriptions(any())).thenReturn(loadResult);
+        spySessionLoop.init();
+        Session session = spy(new Session());
+        session.setChannel(new NioSocketChannel());
+        when(session.isClean()).thenReturn(Boolean.FALSE);
+        spySessionLoop.notifyPullMessage(session, new Subscription("test"), new Queue());
+        Assert.assertTrue(session.isLoaded());
+    }
 }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
index ce06ece..ddcc823 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
@@ -21,16 +21,21 @@ import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttSubscribePayload;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 import org.apache.rocketmq.mqtt.common.model.MqttTopic;
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 @Component
@@ -39,15 +44,28 @@ public class SubscribeProcessor implements UpstreamProcessor {
     @Resource
     private FirstTopicManager firstTopicManager;
 
+    private SubscriptionPersistManager subscriptionPersistManager;
+
     @Override
     public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage message) {
         MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
         MqttSubscribePayload payload = mqttSubscribeMessage.payload();
         List<MqttTopicSubscription> mqttTopicSubscriptions = payload.topicSubscriptions();
+        Set<Subscription> subscriptions = new HashSet<>();
         for (MqttTopicSubscription mqttTopicSubscription : mqttTopicSubscriptions) {
             String topicFilter = TopicUtils.normalizeTopic(mqttTopicSubscription.topicName());
             MqttTopic mqttTopic = TopicUtils.decode(topicFilter);
             firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic());
+            Subscription subscription = new Subscription();
+            subscription.setTopicFilter(topicFilter);
+            subscription.setQos(mqttTopicSubscription.qualityOfService().value());
+            subscriptions.add(subscription);
+        }
+        if (subscriptionPersistManager == null) {
+            subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+        }
+        if (subscriptionPersistManager != null) {
+            subscriptionPersistManager.saveSubscriptions(context.getClientId(), subscriptions);
         }
         return HookResult.newHookResult(HookResult.SUCCESS, null, null);
     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
index 1e67703..046a9a8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
@@ -19,15 +19,20 @@ package org.apache.rocketmq.mqtt.ds.upstream.processor;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 import org.apache.rocketmq.mqtt.common.model.MqttTopic;
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 @Component
@@ -36,14 +41,26 @@ public class UnSubscribeProcessor implements UpstreamProcessor {
     @Resource
     private FirstTopicManager firstTopicManager;
 
+    private SubscriptionPersistManager subscriptionPersistManager;
+
     @Override
     public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage message) {
         MqttUnsubscribePayload payload = (MqttUnsubscribePayload) message.payload();
         if (payload.topics() != null && !payload.topics().isEmpty()) {
+            Set<Subscription> subscriptions = new HashSet<>();
             for (String topic : payload.topics()) {
                 String topicFilter = TopicUtils.normalizeTopic(topic);
                 MqttTopic mqttTopic = TopicUtils.decode(topicFilter);
                 firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic());
+                Subscription subscription = new Subscription();
+                subscription.setTopicFilter(topicFilter);
+                subscriptions.add(subscription);
+            }
+            if (subscriptionPersistManager == null) {
+                subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+            }
+            if (subscriptionPersistManager != null) {
+                subscriptionPersistManager.removeSubscriptions(context.getClientId(), subscriptions);
             }
         }
         return HookResult.newHookResult(HookResult.SUCCESS, null, null);
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java
new file mode 100644
index 0000000..11fc358
--- /dev/null
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.ds.test;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribePayload;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestSubscribeProcessor {
+
+    @Mock
+    private FirstTopicManager firstTopicManager;
+
+    @Mock
+    private SubscriptionPersistManager subscriptionPersistManager;
+
+    @Test
+    public void test() throws IllegalAccessException {
+        SubscribeProcessor subscribeProcessor = new SubscribeProcessor();
+        FieldUtils.writeDeclaredField(subscribeProcessor, "firstTopicManager", firstTopicManager, true);
+        FieldUtils.writeDeclaredField(subscribeProcessor, "subscriptionPersistManager", subscriptionPersistManager, true);
+
+        MqttMessageUpContext context = new MqttMessageUpContext();
+        context.setClientId("test");
+
+        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(1);
+        MqttSubscribePayload payload = new MqttSubscribePayload(Arrays.asList(new MqttTopicSubscription("test", MqttQoS.AT_LEAST_ONCE)));
+        MqttSubscribeMessage mqttSubscribeMessage = new MqttSubscribeMessage(mqttFixedHeader, variableHeader, payload);
+
+        subscribeProcessor.process(context, mqttSubscribeMessage);
+        verify(firstTopicManager).checkFirstTopicIfCreated(any());
+        verify(subscriptionPersistManager).saveSubscriptions(any(), anySet());
+    }
+
+}
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java
new file mode 100644
index 0000000..3c1c3af
--- /dev/null
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java
@@ -0,0 +1,76 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.ds.test;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribePayload;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.UnSubscribeProcessor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestUnSubscribeProcessor {
+
+    @Mock
+    private FirstTopicManager firstTopicManager;
+
+    @Mock
+    private SubscriptionPersistManager subscriptionPersistManager;
+
+    @Test
+    public void test() throws IllegalAccessException {
+        UnSubscribeProcessor unSubscribeProcessor = new UnSubscribeProcessor();
+        FieldUtils.writeDeclaredField(unSubscribeProcessor, "firstTopicManager", firstTopicManager, true);
+        FieldUtils.writeDeclaredField(unSubscribeProcessor, "subscriptionPersistManager", subscriptionPersistManager, true);
+
+        MqttMessageUpContext context = new MqttMessageUpContext();
+        context.setClientId("test");
+
+        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(1);
+        MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Arrays.asList("test"));
+        MqttUnsubscribeMessage mqttUnsubscribeMessage = new MqttUnsubscribeMessage(mqttFixedHeader, variableHeader, payload);
+
+        unSubscribeProcessor.process(context, mqttUnsubscribeMessage);
+        verify(firstTopicManager).checkFirstTopicIfCreated(any());
+        verify(subscriptionPersistManager).removeSubscriptions(any(), anySet());
+    }
+
+}