You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/14 18:48:50 UTC

[activemq-artemis] branch main updated: ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c9208aafda ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
c9208aafda is described below

commit c9208aafda81e15c31bbcda3a99557f3fa29dd09
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Jun 2 14:19:53 2022 -0500

    ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
    
    MQTT 3.1 and 3.1.1 clients using a clean session should have a
    *non-durable* subscription queue. If the broker restarts the queue
    should be removed. This is due to [MQTT-3.1.2-6] which states that the
    session (and any state) must last only as long as the network
    connection.
---
 .../core/protocol/mqtt/MQTTSubscriptionManager.java    | 18 ++++++++++++------
 docs/user-manual/en/versions.md                        | 15 +++++++++++++++
 .../tests/integration/mqtt/MQTTQueueCleanTest.java     | 15 +++++++++++++++
 .../integration/mqtt/PahoMQTTQOS2SecurityTest.java     |  2 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java     | 17 +++++++++++++++++
 5 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 75b6bec340..1914a5dfba 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -119,7 +119,7 @@ public class MQTTSubscriptionManager {
       int qos = subscription.qualityOfService().value();
       String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, session.getWildcardConfiguration());
 
-      Queue q = createQueueForSubscription(coreAddress, qos, sharedSubscriptionName);
+      Queue q = createQueueForSubscription(coreAddress, sharedSubscriptionName);
 
       if (initialStart) {
          createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null);
@@ -153,7 +153,7 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private Queue createQueueForSubscription(String address, int qos, String sharedSubscriptionName) throws Exception {
+   private Queue createQueueForSubscription(String address, String sharedSubscriptionName) throws Exception {
       // determine the proper queue name
       SimpleString queue;
       if (sharedSubscriptionName != null) {
@@ -184,14 +184,20 @@ public class MQTTSubscriptionManager {
             addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
                                                                    RoutingType.MULTICAST, true);
          }
-         return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
+         return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
       }
       return q;
    }
 
-   private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception {
+   private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue) throws Exception {
+      /*
+       * MQTT 3.1 and 3.1.1 clients using a clean session should have a *non-durable* subscription queue. If the broker
+       * restarts the queue should be removed. This is due to [MQTT-3.1.2-6] which states that the session (and any
+       * state) must last only as long as the network connection.
+       */
+      boolean durable = session.getVersion() == MQTTVersion.MQTT_5 || (session.getVersion() != MQTTVersion.MQTT_5 && !session.isClean());
       if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
-         return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
+         return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
       }
 
       if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
@@ -207,7 +213,7 @@ public class MQTTSubscriptionManager {
             return session.getServer().locateQueue(name);
          } else {
             try {
-               return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
+               return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
             } catch (ActiveMQQueueExistsException e) {
                return session.getServer().locateQueue(addressInfo.getName());
             }
diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md
index 5318004940..df21205a9c 100644
--- a/docs/user-manual/en/versions.md
+++ b/docs/user-manual/en/versions.md
@@ -8,6 +8,21 @@ This chapter provides the following information for each release:
   - **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md) 
     chapter in addition to any version-specific upgrade instructions outlined here.
 
+## 2.24.0
+[Full release notes](TBD).
+
+Highlights:
+- TBD
+
+#### Upgrading from older versions
+
+Due to [ARTEMIS-3851](https://issues.apache.org/jira/browse/ARTEMIS-3851) the queue
+created for an MQTT 3.x subscriber using `CleanSession=1` is now **non-durable**
+rather than durable. This may impact `security-settings` for MQTT clients which
+previously only had `createDurableQueue` for their role. They will now need
+`createNonDurableQueue` as well. Again, this only has potential impact for MQTT 3.x
+clients using `CleanSession=1`.
+
 ## 2.23.0
 [Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12351677).
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
index e6889f0a63..355315bece 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
@@ -73,6 +73,21 @@ public class MQTTQueueCleanTest extends MQTTTestSupport {
       }
    }
 
+   @Test
+   public void testQueueCleanOnRestart() throws Exception {
+      String topic = "clean/test";
+      String clientId = "mqtt-client";
+      String queueName = "mqtt-client.clean.test";
+
+      MQTTClientProvider clientProvider = getMQTTClientProvider();
+      clientProvider.setClientId(clientId);
+      initializeConnection(clientProvider);
+      clientProvider.subscribe(topic, AT_LEAST_ONCE);
+      server.stop();
+      server.start();
+      Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10);
+   }
+
    @Test
    public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception {
       Random random = new Random();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index bb9e985dc0..f70f33facc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -69,7 +69,7 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport {
       MqttClient consumer = createPahoClient("consumerId");
       MqttClient producer = createPahoClient("producerId");
       MqttConnectOptions conOpt = new MqttConnectOptions();
-      conOpt.setCleanSession(true);
+      conOpt.setCleanSession(false);
       conOpt.setUserName(user1);
       conOpt.setPassword(password1.toCharArray());
       consumer.connect(conOpt);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 50ecd97a11..c2af69ede9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -208,4 +208,21 @@ public class MQTT5Test extends MQTT5TestSupport {
       scanSessions();
       assertEquals(0, server.locateQueue("DLA").getMessageCount());
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testQueueCleanOnRestart() throws Exception {
+      String topic = RandomUtil.randomString();
+      String clientId = RandomUtil.randomString();
+
+      MqttClient client = createPahoClient(clientId);
+      MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+         .sessionExpiryInterval(999L)
+         .cleanStart(true)
+         .build();
+      client.connect(options);
+      client.subscribe(topic, AT_LEAST_ONCE);
+      server.stop();
+      server.start();
+      org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 3000, 10);
+   }
 }