You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2023/01/09 16:46:11 UTC

[activemq-artemis] branch main updated: ARTEMIS-4126 address not auto-created when sending MQTT msg

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e531c61c95 ARTEMIS-4126 address not auto-created when sending MQTT msg
e531c61c95 is described below

commit e531c61c95b579373e7bfa78b0e8dba4ee4eb8c1
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jan 6 23:12:11 2023 -0600

    ARTEMIS-4126 address not auto-created when sending MQTT msg
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |  8 +++-
 .../artemis/core/protocol/mqtt/MQTTUtil.java       |  4 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 28 +++++++++++++
 .../tests/integration/mqtt5/MQTT5TestSupport.java  |  6 +++
 .../controlpackets/PublishTestsWithSecurity.java   | 49 +++++++++++++++++++++-
 5 files changed, 89 insertions(+), 6 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index f1cb8c5677..c65cf2186b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -199,8 +199,9 @@ public class MQTTPublishManager {
                topic = message.variableHeader().topicName();
             }
          }
-
-         Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, message);
+         String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
+         SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
+         Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
          int qos = message.fixedHeader().qosLevel().value();
          if (qos > 0) {
             serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
@@ -213,6 +214,9 @@ public class MQTTPublishManager {
 
             Transaction tx = session.getServerSession().newTransaction();
             try {
+               if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
+                  session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
+               }
                session.getServerSession().send(tx, serverMessage, true, false);
 
                if (message.fixedHeader().isRetain()) {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 91c092db01..b7bc1f8b9d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -233,10 +233,8 @@ public class MQTTUtil {
    }
 
    public static Message createServerMessageFromByteBuf(MQTTSession session,
-                                                              String topic,
+                                                              SimpleString address,
                                                               MqttPublishMessage mqttPublishMessage) {
-      String coreAddress = convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
-      SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
       ICoreMessage message = createServerMessage(session, address, mqttPublishMessage);
 
       ByteBuf payload = mqttPublishMessage.payload();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index d88f4a6981..26f00e7a18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -80,6 +80,34 @@ public class MQTT5Test extends MQTT5TestSupport {
       context.close();
    }
 
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testAddressAutoCreation() throws Exception {
+      final String DESTINATION = RandomUtil.randomString();
+      server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(true));
+
+      MqttClient producer = createPahoClient(RandomUtil.randomString());
+      producer.connect();
+      producer.publish(DESTINATION, new byte[0], 0, false);
+      producer.disconnect();
+      producer.close();
+
+      Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) != null, 2000, 100);
+   }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testAddressAutoCreationNegative() throws Exception {
+      final String DESTINATION = RandomUtil.randomString();
+      server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(false));
+
+      MqttClient producer = createPahoClient(RandomUtil.randomString());
+      producer.connect();
+      producer.publish(DESTINATION, new byte[0], 0, false);
+      producer.disconnect();
+      producer.close();
+
+      assertTrue(server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) == null);
+   }
+
    /*
     * Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184
     */
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index 6891d0311e..d9ad8181f5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -130,6 +130,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
    protected String noprivUser = "noprivs";
    protected String noprivPass = "noprivs";
 
+   protected String createAddressUser = "createAddress";
+   protected String createAddressPass = "createAddress";
+
    protected String browseUser = "browser";
    protected String browsePass = "browser";
 
@@ -201,6 +204,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
          // User additions
          securityManager.getConfiguration().addUser(noprivUser, noprivPass);
          securityManager.getConfiguration().addRole(noprivUser, "nothing");
+         securityManager.getConfiguration().addUser(createAddressUser, createAddressPass);
+         securityManager.getConfiguration().addRole(createAddressUser, "createAddress");
          securityManager.getConfiguration().addUser(browseUser, browsePass);
          securityManager.getConfiguration().addRole(browseUser, "browser");
          securityManager.getConfiguration().addUser(guestUser, guestPass);
@@ -215,6 +220,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
          value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false));
          value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
          value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
+         value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
          securityRepository.addMatch("#", value);
 
          server.getConfiguration().setSecurityEnabled(true);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index a7749236d0..d3c4704bf7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -20,9 +20,14 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
+import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
@@ -42,8 +47,9 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
    }
 
    @Test(timeout = DEFAULT_TIMEOUT)
-   public void testAuthorizationFailure() throws Exception {
+   public void testCreateAddressAuthorizationFailure() throws Exception {
       final String CLIENT_ID = "publisher";
+      final CountDownLatch latch = new CountDownLatch(1);
       MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
          .username(noprivUser)
          .password(noprivPass.getBytes(StandardCharsets.UTF_8))
@@ -51,6 +57,43 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
       MqttClient client = createPahoClient(CLIENT_ID);
       client.connect(options);
 
+      server.getManagementService().addNotificationListener(notification -> {
+         if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.CREATE_ADDRESS) {
+            latch.countDown();
+         }
+      });
+
+      try {
+         client.publish("/foo", new byte[0], 2, false);
+         fail("Publishing should have failed with a security problem");
+      } catch (MqttException e) {
+         assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte) e.getReasonCode());
+      } catch (Exception e) {
+         fail("Should have thrown an MqttException");
+      }
+
+      assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+      assertFalse(client.isConnected());
+   }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testSendAuthorizationFailure() throws Exception {
+      final String CLIENT_ID = "publisher";
+      final CountDownLatch latch = new CountDownLatch(1);
+      MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+         .username(createAddressUser)
+         .password(createAddressPass.getBytes(StandardCharsets.UTF_8))
+         .build();
+      MqttClient client = createPahoClient(CLIENT_ID);
+      client.connect(options);
+
+      server.getManagementService().addNotificationListener(notification -> {
+         if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.SEND) {
+            latch.countDown();
+         }
+      });
+
       try {
          client.publish("/foo", new byte[0], 2, false);
          fail("Publishing should have failed with a security problem");
@@ -60,7 +103,11 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
          fail("Should have thrown an MqttException");
       }
 
+      assertTrue(latch.await(2, TimeUnit.SECONDS));
+
       assertFalse(client.isConnected());
+
+      Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100);
    }
 
    @Test(timeout = DEFAULT_TIMEOUT)