You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/24 03:12:50 UTC
[rocketmq-mqtt] branch main updated: Define a subscription storage interface (#79)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new e0b5e20 Define a subscription storage interface (#79)
e0b5e20 is described below
commit e0b5e20abbafb1369e54c80146e9e4190ca8f853
Author: pingww <pi...@gmail.com>
AuthorDate: Sun Apr 24 11:12:46 2022 +0800
Define a subscription storage interface (#79)
---
mqtt-common/pom.xml | 4 ++
.../common/facade/SubscriptionPersistManager.java | 47 ++++++++-----
.../rocketmq/mqtt/common/util/SpringUtils.java | 21 +++---
.../mqtt/cs/session/loop/SessionLoopImpl.java | 29 +++++++++
.../apache/rocketmq/mqtt/cs/starter/Startup.java | 3 +-
.../rocketmq/mqtt/cs/test/TestSessionLoopImpl.java | 18 +++++
.../ds/upstream/processor/SubscribeProcessor.java | 18 +++++
.../upstream/processor/UnSubscribeProcessor.java | 17 +++++
.../mqtt/ds/test/TestSubscribeProcessor.java | 73 +++++++++++++++++++++
.../mqtt/ds/test/TestUnSubscribeProcessor.java | 76 ++++++++++++++++++++++
10 files changed, 279 insertions(+), 27 deletions(-)
diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index f5c75b4..1cf838b 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -46,5 +46,9 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
similarity index 52%
copy from mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
index 8d38042..ff49d5a 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/SubscriptionPersistManager.java
@@ -15,20 +15,35 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.cs.starter;
-
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-
-public class Startup {
-
- public static void main(String[] args) {
- System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
-
- ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
- System.out.println("start rocketmq mqtt ...");
- }
-
+package org.apache.rocketmq.mqtt.common.facade;
+
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public interface SubscriptionPersistManager {
+ /**
+ * loadSubscriptions
+ *
+ * @param clientId
+ * @return
+ */
+ CompletableFuture<Set<Subscription>> loadSubscriptions(String clientId);
+
+ /**
+ * saveSubscriptions
+ *
+ * @param clientId
+ * @param subscriptions
+ */
+ void saveSubscriptions(String clientId, Set<Subscription> subscriptions);
+
+ /**
+ * removeSubscriptions
+ *
+ * @param clientId
+ * @param subscriptions
+ */
+ void removeSubscriptions(String clientId, Set<Subscription> subscriptions);
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
similarity index 64%
copy from mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
index 8d38042..6cbfa49 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/SpringUtils.java
@@ -15,20 +15,21 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.cs.starter;
+package org.apache.rocketmq.mqtt.common.util;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.springframework.context.support.ClassPathXmlApplicationContext;
+public class SpringUtils {
+ private static ClassPathXmlApplicationContext applicationContext;
-public class Startup {
-
- public static void main(String[] args) {
- System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
-
- ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
- System.out.println("start rocketmq mqtt ...");
+ public static void SetClassPathXmlApplicationContext(ClassPathXmlApplicationContext applicationContext) {
+ SpringUtils.applicationContext = applicationContext;
}
+ public static <T> T getBean(Class<T> type) {
+ if (applicationContext == null) {
+ return null;
+ }
+ return applicationContext.getBean(type);
+ }
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 5ee3761..0dd0a01 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -23,10 +23,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
@@ -88,6 +90,7 @@ public class SessionLoopImpl implements SessionLoop {
private ScheduledThreadPoolExecutor pullService;
private ScheduledThreadPoolExecutor scheduler;
private ScheduledThreadPoolExecutor persistOffsetScheduler;
+ private SubscriptionPersistManager subscriptionPersistManager;
/**
* channelId->session
@@ -336,6 +339,32 @@ public class SessionLoopImpl implements SessionLoop {
if (session == null || session.isDestroyed()) {
return;
}
+ if (subscriptionPersistManager == null) {
+ subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+ }
+ if (subscriptionPersistManager != null &&
+ !session.isClean() && !session.isLoaded()) {
+ if (session.isLoading()) {
+ return;
+ }
+ session.setLoading();
+ CompletableFuture<Set<Subscription>> future = subscriptionPersistManager.loadSubscriptions(session.getClientId());
+ future.whenComplete((subscriptions, throwable) -> {
+ if (throwable != null) {
+ logger.error("", throwable);
+ scheduler.schedule(() -> {
+ session.resetLoad();
+ notifyPullMessage(session, subscription, queue);
+ }, 3, TimeUnit.SECONDS);
+ return;
+ }
+ session.addSubscription(subscriptions);
+ matchAction.addSubscription(session, subscriptions);
+ session.setLoaded();
+ notifyPullMessage(session, subscription, queue);
+ });
+ return;
+ }
if (queue != null) {
if (subscription == null) {
throw new RuntimeException(
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
index 8d38042..7fa680f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.mqtt.cs.starter;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -27,7 +28,7 @@ public class Startup {
System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
-
+ SpringUtils.SetClassPathXmlApplicationContext(applicationContext);
System.out.println("start rocketmq mqtt ...");
}
diff --git a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
index c0d4c43..32bdd7e 100644
--- a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
+++ b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/TestSessionLoopImpl.java
@@ -23,6 +23,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -46,6 +47,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -165,4 +167,20 @@ public class TestSessionLoopImpl {
verify(queueCache, atLeastOnce()).pullMessage(any(), any(), any(), any(), anyInt(), any());
}
+
+ @Test
+ public void testLoadSubscription() throws IllegalAccessException, InterruptedException {
+ SubscriptionPersistManager subscriptionPersistManager = mock(SubscriptionPersistManager.class);
+ FieldUtils.writeDeclaredField(sessionLoop, "subscriptionPersistManager", subscriptionPersistManager, true);
+ SessionLoopImpl spySessionLoop = spy(sessionLoop);
+ CompletableFuture<Set<Subscription>> loadResult = new CompletableFuture();
+ loadResult.complete(new HashSet<>(Arrays.asList(new Subscription("test"))));
+ when(subscriptionPersistManager.loadSubscriptions(any())).thenReturn(loadResult);
+ spySessionLoop.init();
+ Session session = spy(new Session());
+ session.setChannel(new NioSocketChannel());
+ when(session.isClean()).thenReturn(Boolean.FALSE);
+ spySessionLoop.notifyPullMessage(session, new Subscription("test"), new Queue());
+ Assert.assertTrue(session.isLoaded());
+ }
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
index ce06ece..ddcc823 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/SubscribeProcessor.java
@@ -21,16 +21,21 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.model.MqttTopic;
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
@Component
@@ -39,15 +44,28 @@ public class SubscribeProcessor implements UpstreamProcessor {
@Resource
private FirstTopicManager firstTopicManager;
+ private SubscriptionPersistManager subscriptionPersistManager;
+
@Override
public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage message) {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
MqttSubscribePayload payload = mqttSubscribeMessage.payload();
List<MqttTopicSubscription> mqttTopicSubscriptions = payload.topicSubscriptions();
+ Set<Subscription> subscriptions = new HashSet<>();
for (MqttTopicSubscription mqttTopicSubscription : mqttTopicSubscriptions) {
String topicFilter = TopicUtils.normalizeTopic(mqttTopicSubscription.topicName());
MqttTopic mqttTopic = TopicUtils.decode(topicFilter);
firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic());
+ Subscription subscription = new Subscription();
+ subscription.setTopicFilter(topicFilter);
+ subscription.setQos(mqttTopicSubscription.qualityOfService().value());
+ subscriptions.add(subscription);
+ }
+ if (subscriptionPersistManager == null) {
+ subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+ }
+ if (subscriptionPersistManager != null) {
+ subscriptionPersistManager.saveSubscriptions(context.getClientId(), subscriptions);
}
return HookResult.newHookResult(HookResult.SUCCESS, null, null);
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
index 1e67703..046a9a8 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/UnSubscribeProcessor.java
@@ -19,15 +19,20 @@ package org.apache.rocketmq.mqtt.ds.upstream.processor;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.model.MqttTopic;
+import org.apache.rocketmq.mqtt.common.model.Subscription;
+import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
@Component
@@ -36,14 +41,26 @@ public class UnSubscribeProcessor implements UpstreamProcessor {
@Resource
private FirstTopicManager firstTopicManager;
+ private SubscriptionPersistManager subscriptionPersistManager;
+
@Override
public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage message) {
MqttUnsubscribePayload payload = (MqttUnsubscribePayload) message.payload();
if (payload.topics() != null && !payload.topics().isEmpty()) {
+ Set<Subscription> subscriptions = new HashSet<>();
for (String topic : payload.topics()) {
String topicFilter = TopicUtils.normalizeTopic(topic);
MqttTopic mqttTopic = TopicUtils.decode(topicFilter);
firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic());
+ Subscription subscription = new Subscription();
+ subscription.setTopicFilter(topicFilter);
+ subscriptions.add(subscription);
+ }
+ if (subscriptionPersistManager == null) {
+ subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
+ }
+ if (subscriptionPersistManager != null) {
+ subscriptionPersistManager.removeSubscriptions(context.getClientId(), subscriptions);
}
}
return HookResult.newHookResult(HookResult.SUCCESS, null, null);
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java
new file mode 100644
index 0000000..11fc358
--- /dev/null
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestSubscribeProcessor.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.ds.test;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribePayload;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestSubscribeProcessor {
+
+ @Mock
+ private FirstTopicManager firstTopicManager;
+
+ @Mock
+ private SubscriptionPersistManager subscriptionPersistManager;
+
+ @Test
+ public void test() throws IllegalAccessException {
+ SubscribeProcessor subscribeProcessor = new SubscribeProcessor();
+ FieldUtils.writeDeclaredField(subscribeProcessor, "firstTopicManager", firstTopicManager, true);
+ FieldUtils.writeDeclaredField(subscribeProcessor, "subscriptionPersistManager", subscriptionPersistManager, true);
+
+ MqttMessageUpContext context = new MqttMessageUpContext();
+ context.setClientId("test");
+
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+ MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(1);
+ MqttSubscribePayload payload = new MqttSubscribePayload(Arrays.asList(new MqttTopicSubscription("test", MqttQoS.AT_LEAST_ONCE)));
+ MqttSubscribeMessage mqttSubscribeMessage = new MqttSubscribeMessage(mqttFixedHeader, variableHeader, payload);
+
+ subscribeProcessor.process(context, mqttSubscribeMessage);
+ verify(firstTopicManager).checkFirstTopicIfCreated(any());
+ verify(subscriptionPersistManager).saveSubscriptions(any(), anySet());
+ }
+
+}
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java
new file mode 100644
index 0000000..3c1c3af
--- /dev/null
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/TestUnSubscribeProcessor.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.ds.test;
+
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttSubscribePayload;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.UnSubscribeProcessor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestUnSubscribeProcessor {
+
+ @Mock
+ private FirstTopicManager firstTopicManager;
+
+ @Mock
+ private SubscriptionPersistManager subscriptionPersistManager;
+
+ @Test
+ public void test() throws IllegalAccessException {
+ UnSubscribeProcessor unSubscribeProcessor = new UnSubscribeProcessor();
+ FieldUtils.writeDeclaredField(unSubscribeProcessor, "firstTopicManager", firstTopicManager, true);
+ FieldUtils.writeDeclaredField(unSubscribeProcessor, "subscriptionPersistManager", subscriptionPersistManager, true);
+
+ MqttMessageUpContext context = new MqttMessageUpContext();
+ context.setClientId("test");
+
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+ MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(1);
+ MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Arrays.asList("test"));
+ MqttUnsubscribeMessage mqttUnsubscribeMessage = new MqttUnsubscribeMessage(mqttFixedHeader, variableHeader, payload);
+
+ unSubscribeProcessor.process(context, mqttUnsubscribeMessage);
+ verify(firstTopicManager).checkFirstTopicIfCreated(any());
+ verify(subscriptionPersistManager).removeSubscriptions(any(), anySet());
+ }
+
+}