You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by br...@apache.org on 2023/01/23 15:18:10 UTC

[activemq-artemis] branch main updated: ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth

This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b0ba8cae24 ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth
b0ba8cae24 is described below

commit b0ba8cae2404f33d6f2c9d3ee660c153be3edf48
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Jan 19 13:25:43 2023 -0600

    ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth
---
 .../core/protocol/mqtt/MQTTConnectionManager.java       |  2 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java         |  8 ++++----
 .../core/protocol/mqtt/MQTTSubscriptionManager.java     | 10 +++++++---
 .../activemq/artemis/core/server/ServerSession.java     |  2 ++
 .../artemis/core/server/impl/ServerSessionImpl.java     |  7 ++++++-
 .../tests/integration/mqtt5/MQTT5TestSupport.java       | 13 ++++++++++++-
 .../spec/controlpackets/SubscribeTestsWithSecurity.java | 17 +++++++++++++++++
 7 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index fe1b9c50c6..1cdcc149e4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -83,7 +83,7 @@ public class MQTTConnectionManager {
             /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
              * start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
              * MUST NOT be reused in any subsequent Session */
-            session.clean();
+            session.clean(true);
             session.setClean(true);
          }
 
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 2cc34690b5..5e87d7a459 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -132,7 +132,7 @@ public class MQTTSession {
                   // If the session expires the will message must be sent no matter the will delay
                   sendWillMessage();
                }
-               clean();
+               clean(false);
                protocolManager.removeSessionState(connection.getClientID());
             } else {
                state.setDisconnectedTime(System.currentTimeMillis());
@@ -142,7 +142,7 @@ public class MQTTSession {
                sendWillMessage();
             }
             if (isClean()) {
-               clean();
+               clean(false);
                protocolManager.removeSessionState(connection.getClientID());
             }
          }
@@ -222,8 +222,8 @@ public class MQTTSession {
       return protocolManager;
    }
 
-   void clean() throws Exception {
-      subscriptionManager.clean();
+   void clean(boolean enforceSecurity) throws Exception {
+      subscriptionManager.clean(enforceSecurity);
       mqttPublishManager.clean();
       state.clear();
    }
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 47f1917a58..f0a7f2ecc9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -252,6 +252,10 @@ public class MQTTSubscriptionManager {
    }
 
    private short removeSubscription(String address) {
+      return removeSubscription(address, true);
+   }
+
+   private short removeSubscription(String address, boolean enforceSecurity) {
       if (session.getState().getSubscription(address) == null) {
          return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
       }
@@ -290,7 +294,7 @@ public class MQTTSubscriptionManager {
             if (queue.isConfigurationManaged()) {
                queue.deleteAllReferences();
             } else {
-               session.getServerSession().deleteQueue(internalQueueName);
+               session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
             }
          }
       } catch (Exception e) {
@@ -367,9 +371,9 @@ public class MQTTSubscriptionManager {
       return consumerQoSLevels;
    }
 
-   void clean() {
+   void clean(boolean enforceSecurity) {
       for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) {
-         removeSubscription(mqttTopicSubscription.topicName());
+         removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity);
       }
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 2d880f1ec8..e1f88157b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -308,6 +308,8 @@ public interface ServerSession extends SecurityAuth {
 
    void deleteQueue(SimpleString name) throws Exception;
 
+   void deleteQueue(SimpleString name, boolean enforceSecurity) throws Exception;
+
    ServerConsumer createConsumer(long consumerID,
                                  SimpleString queueName,
                                  SimpleString filterString,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index ce02f88e2f..1df10adde6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1167,6 +1167,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void deleteQueue(final SimpleString queueToDelete) throws Exception {
+      deleteQueue(queueToDelete, true);
+   }
+
+   @Override
+   public void deleteQueue(final SimpleString queueToDelete, boolean enforceSecurity) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.destroyQueue(this, remotingConnection.getSubject(), remotingConnection.getRemoteAddress(), queueToDelete);
       }
@@ -1178,7 +1183,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw new ActiveMQNonExistentQueueException();
       }
 
-      server.destroyQueue(unPrefixedQueueName, this, true, false, true);
+      server.destroyQueue(unPrefixedQueueName, enforceSecurity ? this : null, true, false, true);
 
       TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index d9ad8181f5..6d4abc9c6a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -142,6 +142,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
    protected String fullUser = "user";
    protected String fullPass = "pass";
 
+   protected String noDeleteUser = "noDelete";
+   protected String noDeletePass = "noDelete";
+
    @Rule
    public TestName name = new TestName();
 
@@ -212,6 +215,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
          securityManager.getConfiguration().addRole(guestUser, "guest");
          securityManager.getConfiguration().addUser(fullUser, fullPass);
          securityManager.getConfiguration().addRole(fullUser, "full");
+         securityManager.getConfiguration().addUser(noDeleteUser, noDeleteUser);
+         securityManager.getConfiguration().addRole(noDeleteUser, "noDelete");
 
          // Configure roles
          HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
@@ -221,6 +226,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
          value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
          value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
          value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
+         value.add(new Role("noDelete", true, true, true, false, true, false, true, true, true, true));
          securityRepository.addMatch("#", value);
 
          server.getConfiguration().setSecurityEnabled(true);
@@ -344,7 +350,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
 
    protected Queue getSubscriptionQueue(String TOPIC) {
       try {
-         return ((LocalQueueBinding)server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray()[0]).getQueue();
+         Object[] array = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray();
+         if (array.length == 0) {
+            return null;
+         } else {
+            return ((LocalQueueBinding)array[0]).getQueue();
+         }
       } catch (Exception e) {
          e.printStackTrace();
          return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
index 8f1b716eee..0492755e8c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.eclipse.paho.mqttv5.client.IMqttToken;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
@@ -92,4 +93,20 @@ public class SubscribeTestsWithSecurity extends MQTT5TestSupport {
 
       client.disconnect();
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testSubscriptionQueueRemoved() throws Exception {
+      final String CLIENT_ID = "consumer";
+      MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+         .username(noDeleteUser)
+         .password(noDeletePass.getBytes(StandardCharsets.UTF_8))
+         .build();
+      MqttClient client = createPahoClient(CLIENT_ID);
+      client.connect(options);
+
+      client.subscribe(getTopicName(), 0).waitForCompletion();
+      client.disconnect();
+
+      Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null, 2000, 100);
+   }
 }