You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by br...@apache.org on 2023/01/23 15:18:10 UTC
[activemq-artemis] branch main updated: ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth
This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b0ba8cae24 ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth
b0ba8cae24 is described below
commit b0ba8cae2404f33d6f2c9d3ee660c153be3edf48
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Jan 19 13:25:43 2023 -0600
ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth
---
.../core/protocol/mqtt/MQTTConnectionManager.java | 2 +-
.../artemis/core/protocol/mqtt/MQTTSession.java | 8 ++++----
.../core/protocol/mqtt/MQTTSubscriptionManager.java | 10 +++++++---
.../activemq/artemis/core/server/ServerSession.java | 2 ++
.../artemis/core/server/impl/ServerSessionImpl.java | 7 ++++++-
.../tests/integration/mqtt5/MQTT5TestSupport.java | 13 ++++++++++++-
.../spec/controlpackets/SubscribeTestsWithSecurity.java | 17 +++++++++++++++++
7 files changed, 49 insertions(+), 10 deletions(-)
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index fe1b9c50c6..1cdcc149e4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -83,7 +83,7 @@ public class MQTTConnectionManager {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
- session.clean();
+ session.clean(true);
session.setClean(true);
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 2cc34690b5..5e87d7a459 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -132,7 +132,7 @@ public class MQTTSession {
// If the session expires the will message must be sent no matter the will delay
sendWillMessage();
}
- clean();
+ clean(false);
protocolManager.removeSessionState(connection.getClientID());
} else {
state.setDisconnectedTime(System.currentTimeMillis());
@@ -142,7 +142,7 @@ public class MQTTSession {
sendWillMessage();
}
if (isClean()) {
- clean();
+ clean(false);
protocolManager.removeSessionState(connection.getClientID());
}
}
@@ -222,8 +222,8 @@ public class MQTTSession {
return protocolManager;
}
- void clean() throws Exception {
- subscriptionManager.clean();
+ void clean(boolean enforceSecurity) throws Exception {
+ subscriptionManager.clean(enforceSecurity);
mqttPublishManager.clean();
state.clear();
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 47f1917a58..f0a7f2ecc9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -252,6 +252,10 @@ public class MQTTSubscriptionManager {
}
private short removeSubscription(String address) {
+ return removeSubscription(address, true);
+ }
+
+ private short removeSubscription(String address, boolean enforceSecurity) {
if (session.getState().getSubscription(address) == null) {
return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
}
@@ -290,7 +294,7 @@ public class MQTTSubscriptionManager {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else {
- session.getServerSession().deleteQueue(internalQueueName);
+ session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
}
}
} catch (Exception e) {
@@ -367,9 +371,9 @@ public class MQTTSubscriptionManager {
return consumerQoSLevels;
}
- void clean() {
+ void clean(boolean enforceSecurity) {
for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) {
- removeSubscription(mqttTopicSubscription.topicName());
+ removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 2d880f1ec8..e1f88157b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -308,6 +308,8 @@ public interface ServerSession extends SecurityAuth {
void deleteQueue(SimpleString name) throws Exception;
+ void deleteQueue(SimpleString name, boolean enforceSecurity) throws Exception;
+
ServerConsumer createConsumer(long consumerID,
SimpleString queueName,
SimpleString filterString,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index ce02f88e2f..1df10adde6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1167,6 +1167,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void deleteQueue(final SimpleString queueToDelete) throws Exception {
+ deleteQueue(queueToDelete, true);
+ }
+
+ @Override
+ public void deleteQueue(final SimpleString queueToDelete, boolean enforceSecurity) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.destroyQueue(this, remotingConnection.getSubject(), remotingConnection.getRemoteAddress(), queueToDelete);
}
@@ -1178,7 +1183,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQNonExistentQueueException();
}
- server.destroyQueue(unPrefixedQueueName, this, true, false, true);
+ server.destroyQueue(unPrefixedQueueName, enforceSecurity ? this : null, true, false, true);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index d9ad8181f5..6d4abc9c6a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -142,6 +142,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected String fullUser = "user";
protected String fullPass = "pass";
+ protected String noDeleteUser = "noDelete";
+ protected String noDeletePass = "noDelete";
+
@Rule
public TestName name = new TestName();
@@ -212,6 +215,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
securityManager.getConfiguration().addRole(guestUser, "guest");
securityManager.getConfiguration().addUser(fullUser, fullPass);
securityManager.getConfiguration().addRole(fullUser, "full");
+ securityManager.getConfiguration().addUser(noDeleteUser, noDeleteUser);
+ securityManager.getConfiguration().addRole(noDeleteUser, "noDelete");
// Configure roles
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
@@ -221,6 +226,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
+ value.add(new Role("noDelete", true, true, true, false, true, false, true, true, true, true));
securityRepository.addMatch("#", value);
server.getConfiguration().setSecurityEnabled(true);
@@ -344,7 +350,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected Queue getSubscriptionQueue(String TOPIC) {
try {
- return ((LocalQueueBinding)server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray()[0]).getQueue();
+ Object[] array = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray();
+ if (array.length == 0) {
+ return null;
+ } else {
+ return ((LocalQueueBinding)array[0]).getQueue();
+ }
} catch (Exception e) {
e.printStackTrace();
return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
index 8f1b716eee..0492755e8c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
@@ -92,4 +93,20 @@ public class SubscribeTestsWithSecurity extends MQTT5TestSupport {
client.disconnect();
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testSubscriptionQueueRemoved() throws Exception {
+ final String CLIENT_ID = "consumer";
+ MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+ .username(noDeleteUser)
+ .password(noDeletePass.getBytes(StandardCharsets.UTF_8))
+ .build();
+ MqttClient client = createPahoClient(CLIENT_ID);
+ client.connect(options);
+
+ client.subscribe(getTopicName(), 0).waitForCompletion();
+ client.disconnect();
+
+ Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null, 2000, 100);
+ }
}