You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2023/01/09 16:46:11 UTC
[activemq-artemis] branch main updated: ARTEMIS-4126 address not auto-created when sending MQTT msg
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e531c61c95 ARTEMIS-4126 address not auto-created when sending MQTT msg
e531c61c95 is described below
commit e531c61c95b579373e7bfa78b0e8dba4ee4eb8c1
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jan 6 23:12:11 2023 -0600
ARTEMIS-4126 address not auto-created when sending MQTT msg
---
.../core/protocol/mqtt/MQTTPublishManager.java | 8 +++-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 4 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 28 +++++++++++++
.../tests/integration/mqtt5/MQTT5TestSupport.java | 6 +++
.../controlpackets/PublishTestsWithSecurity.java | 49 +++++++++++++++++++++-
5 files changed, 89 insertions(+), 6 deletions(-)
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index f1cb8c5677..c65cf2186b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -199,8 +199,9 @@ public class MQTTPublishManager {
topic = message.variableHeader().topicName();
}
}
-
- Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, message);
+ String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
+ SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
+ Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
int qos = message.fixedHeader().qosLevel().value();
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
@@ -213,6 +214,9 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
+ if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
+ session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
+ }
session.getServerSession().send(tx, serverMessage, true, false);
if (message.fixedHeader().isRetain()) {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 91c092db01..b7bc1f8b9d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -233,10 +233,8 @@ public class MQTTUtil {
}
public static Message createServerMessageFromByteBuf(MQTTSession session,
- String topic,
+ SimpleString address,
MqttPublishMessage mqttPublishMessage) {
- String coreAddress = convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
- SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
ICoreMessage message = createServerMessage(session, address, mqttPublishMessage);
ByteBuf payload = mqttPublishMessage.payload();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index d88f4a6981..26f00e7a18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -80,6 +80,34 @@ public class MQTT5Test extends MQTT5TestSupport {
context.close();
}
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testAddressAutoCreation() throws Exception {
+ final String DESTINATION = RandomUtil.randomString();
+ server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(true));
+
+ MqttClient producer = createPahoClient(RandomUtil.randomString());
+ producer.connect();
+ producer.publish(DESTINATION, new byte[0], 0, false);
+ producer.disconnect();
+ producer.close();
+
+ Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) != null, 2000, 100);
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testAddressAutoCreationNegative() throws Exception {
+ final String DESTINATION = RandomUtil.randomString();
+ server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(false));
+
+ MqttClient producer = createPahoClient(RandomUtil.randomString());
+ producer.connect();
+ producer.publish(DESTINATION, new byte[0], 0, false);
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) == null);
+ }
+
/*
* Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184
*/
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index 6891d0311e..d9ad8181f5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -130,6 +130,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
+ protected String createAddressUser = "createAddress";
+ protected String createAddressPass = "createAddress";
+
protected String browseUser = "browser";
protected String browsePass = "browser";
@@ -201,6 +204,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
// User additions
securityManager.getConfiguration().addUser(noprivUser, noprivPass);
securityManager.getConfiguration().addRole(noprivUser, "nothing");
+ securityManager.getConfiguration().addUser(createAddressUser, createAddressPass);
+ securityManager.getConfiguration().addRole(createAddressUser, "createAddress");
securityManager.getConfiguration().addUser(browseUser, browsePass);
securityManager.getConfiguration().addRole(browseUser, "browser");
securityManager.getConfiguration().addUser(guestUser, guestPass);
@@ -215,6 +220,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false));
value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
+ value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
securityRepository.addMatch("#", value);
server.getConfiguration().setSecurityEnabled(true);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index a7749236d0..d3c4704bf7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -20,9 +20,14 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
+import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
@@ -42,8 +47,9 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
}
@Test(timeout = DEFAULT_TIMEOUT)
- public void testAuthorizationFailure() throws Exception {
+ public void testCreateAddressAuthorizationFailure() throws Exception {
final String CLIENT_ID = "publisher";
+ final CountDownLatch latch = new CountDownLatch(1);
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(noprivUser)
.password(noprivPass.getBytes(StandardCharsets.UTF_8))
@@ -51,6 +57,43 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
MqttClient client = createPahoClient(CLIENT_ID);
client.connect(options);
+ server.getManagementService().addNotificationListener(notification -> {
+ if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.CREATE_ADDRESS) {
+ latch.countDown();
+ }
+ });
+
+ try {
+ client.publish("/foo", new byte[0], 2, false);
+ fail("Publishing should have failed with a security problem");
+ } catch (MqttException e) {
+ assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte) e.getReasonCode());
+ } catch (Exception e) {
+ fail("Should have thrown an MqttException");
+ }
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+ assertFalse(client.isConnected());
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testSendAuthorizationFailure() throws Exception {
+ final String CLIENT_ID = "publisher";
+ final CountDownLatch latch = new CountDownLatch(1);
+ MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+ .username(createAddressUser)
+ .password(createAddressPass.getBytes(StandardCharsets.UTF_8))
+ .build();
+ MqttClient client = createPahoClient(CLIENT_ID);
+ client.connect(options);
+
+ server.getManagementService().addNotificationListener(notification -> {
+ if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.SEND) {
+ latch.countDown();
+ }
+ });
+
try {
client.publish("/foo", new byte[0], 2, false);
fail("Publishing should have failed with a security problem");
@@ -60,7 +103,11 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
fail("Should have thrown an MqttException");
}
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+
assertFalse(client.isConnected());
+
+ Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100);
}
@Test(timeout = DEFAULT_TIMEOUT)