You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/14 18:48:50 UTC
[activemq-artemis] branch main updated: ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c9208aafda ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
c9208aafda is described below
commit c9208aafda81e15c31bbcda3a99557f3fa29dd09
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Jun 2 14:19:53 2022 -0500
ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
MQTT 3.1 and 3.1.1 clients using a clean session should have a
*non-durable* subscription queue. If the broker restarts the queue
should be removed. This is due to [MQTT-3.1.2-6] which states that the
session (and any state) must last only as long as the network
connection.
---
.../core/protocol/mqtt/MQTTSubscriptionManager.java | 18 ++++++++++++------
docs/user-manual/en/versions.md | 15 +++++++++++++++
.../tests/integration/mqtt/MQTTQueueCleanTest.java | 15 +++++++++++++++
.../integration/mqtt/PahoMQTTQOS2SecurityTest.java | 2 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 17 +++++++++++++++++
5 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 75b6bec340..1914a5dfba 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -119,7 +119,7 @@ public class MQTTSubscriptionManager {
int qos = subscription.qualityOfService().value();
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, session.getWildcardConfiguration());
- Queue q = createQueueForSubscription(coreAddress, qos, sharedSubscriptionName);
+ Queue q = createQueueForSubscription(coreAddress, sharedSubscriptionName);
if (initialStart) {
createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null);
@@ -153,7 +153,7 @@ public class MQTTSubscriptionManager {
}
}
- private Queue createQueueForSubscription(String address, int qos, String sharedSubscriptionName) throws Exception {
+ private Queue createQueueForSubscription(String address, String sharedSubscriptionName) throws Exception {
// determine the proper queue name
SimpleString queue;
if (sharedSubscriptionName != null) {
@@ -184,14 +184,20 @@ public class MQTTSubscriptionManager {
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
- return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
+ return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
}
return q;
}
- private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception {
+ private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue) throws Exception {
+ /*
+ * MQTT 3.1 and 3.1.1 clients using a clean session should have a *non-durable* subscription queue. If the broker
+ * restarts the queue should be removed. This is due to [MQTT-3.1.2-6] which states that the session (and any
+ * state) must last only as long as the network connection.
+ */
+ boolean durable = session.getVersion() == MQTTVersion.MQTT_5 || (session.getVersion() != MQTTVersion.MQTT_5 && !session.isClean());
if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
- return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
+ return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
}
if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
@@ -207,7 +213,7 @@ public class MQTTSubscriptionManager {
return session.getServer().locateQueue(name);
} else {
try {
- return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
+ return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
} catch (ActiveMQQueueExistsException e) {
return session.getServer().locateQueue(addressInfo.getName());
}
diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md
index 5318004940..df21205a9c 100644
--- a/docs/user-manual/en/versions.md
+++ b/docs/user-manual/en/versions.md
@@ -8,6 +8,21 @@ This chapter provides the following information for each release:
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
chapter in addition to any version-specific upgrade instructions outlined here.
+## 2.24.0
+[Full release notes](TBD).
+
+Highlights:
+- TBD
+
+#### Upgrading from older versions
+
+Due to [ARTEMIS-3851](https://issues.apache.org/jira/browse/ARTEMIS-3851) the queue
+created for an MQTT 3.x subscriber using `CleanSession=1` is now **non-durable**
+rather than durable. This may impact `security-settings` for MQTT clients which
+previously only had `createDurableQueue` for their role. They will now need
+`createNonDurableQueue` as well. Again, this only has potential impact for MQTT 3.x
+clients using `CleanSession=1`.
+
## 2.23.0
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12351677).
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
index e6889f0a63..355315bece 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTQueueCleanTest.java
@@ -73,6 +73,21 @@ public class MQTTQueueCleanTest extends MQTTTestSupport {
}
}
+ @Test
+ public void testQueueCleanOnRestart() throws Exception {
+ String topic = "clean/test";
+ String clientId = "mqtt-client";
+ String queueName = "mqtt-client.clean.test";
+
+ MQTTClientProvider clientProvider = getMQTTClientProvider();
+ clientProvider.setClientId(clientId);
+ initializeConnection(clientProvider);
+ clientProvider.subscribe(topic, AT_LEAST_ONCE);
+ server.stop();
+ server.start();
+ Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10);
+ }
+
@Test
public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception {
Random random = new Random();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index bb9e985dc0..f70f33facc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -69,7 +69,7 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport {
MqttClient consumer = createPahoClient("consumerId");
MqttClient producer = createPahoClient("producerId");
MqttConnectOptions conOpt = new MqttConnectOptions();
- conOpt.setCleanSession(true);
+ conOpt.setCleanSession(false);
conOpt.setUserName(user1);
conOpt.setPassword(password1.toCharArray());
consumer.connect(conOpt);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 50ecd97a11..c2af69ede9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -208,4 +208,21 @@ public class MQTT5Test extends MQTT5TestSupport {
scanSessions();
assertEquals(0, server.locateQueue("DLA").getMessageCount());
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testQueueCleanOnRestart() throws Exception {
+ String topic = RandomUtil.randomString();
+ String clientId = RandomUtil.randomString();
+
+ MqttClient client = createPahoClient(clientId);
+ MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+ .sessionExpiryInterval(999L)
+ .cleanStart(true)
+ .build();
+ client.connect(options);
+ client.subscribe(topic, AT_LEAST_ONCE);
+ server.stop();
+ server.start();
+ org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 3000, 10);
+ }
}