You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/15 19:34:00 UTC
[activemq-artemis] branch main updated: ARTEMIS-3702 auth failures don't adhere to MQTT spec
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 2b5a25a ARTEMIS-3702 auth failures don't adhere to MQTT spec
2b5a25a is described below
commit 2b5a25a10611366fd4f643587efcf2292b0f548f
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Mar 3 22:04:20 2022 -0600
ARTEMIS-3702 auth failures don't adhere to MQTT spec
The commit includes the following changes:
- Don't drop the connection on subscribe or publish authorization
failures for 3.1 clients.
- Don't drop the connection on subscribe authorization failures for
3.1.1 clients.
- Add configuration parameter to control behavior on publish
authorization failures for 3.1.1 clients (either disconnect or not).
---
.../core/protocol/mqtt/MQTTConnectionManager.java | 11 +--
.../core/protocol/mqtt/MQTTProtocolHandler.java | 14 +--
.../core/protocol/mqtt/MQTTProtocolManager.java | 10 ++
.../core/protocol/mqtt/MQTTPublishManager.java | 41 ++++++--
.../artemis/core/protocol/mqtt/MQTTSession.java | 14 +--
.../protocol/mqtt/MQTTSubscriptionManager.java | 23 ++++-
.../artemis/core/protocol/mqtt/MQTTVersion.java | 49 +++++++++
docs/user-manual/en/mqtt.md | 22 ++++-
.../tests/integration/mqtt/MQTTSecurityTest.java | 109 +++++++++++++++++++++
9 files changed, 261 insertions(+), 32 deletions(-)
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index c92e8c9..d56e18f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -58,9 +58,8 @@ public class MQTTConnectionManager {
}
void connect(MqttConnectMessage connect, String validatedUser) throws Exception {
- int packetVersion = connect.variableHeader().version();
- if (packetVersion == MqttVersion.MQTT_5.protocolLevel()) {
- session.set5(true);
+ session.setVersion(MQTTVersion.getVersion(connect.variableHeader().version()));
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);
@@ -121,7 +120,7 @@ public class MQTTConnectionManager {
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
@@ -133,7 +132,7 @@ public class MQTTConnectionManager {
}
MqttProperties connackProperties;
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
@@ -241,7 +240,7 @@ public class MQTTConnectionManager {
if (connection != null) {
MQTTSession existingSession = session.getProtocolManager().getSessionState(clientId).getSession();
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
// [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 6e9d6bf..488d8bd 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -99,7 +99,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttMessage message = (MqttMessage) msg;
if (stopped) {
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
@@ -109,7 +109,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
logger.debugf(message.decoderResult().cause(), "Disconnecting client due to message decoding failure.");
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.MALFORMED_PACKET);
}
disconnect(true);
@@ -186,7 +186,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
} catch (Exception e) {
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
@@ -232,7 +232,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
try {
validatedUser = session.getServer().validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain());
} catch (ActiveMQSecurityException e) {
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getProtocolHandler().sendConnack(MQTTReasonCodes.BAD_USER_NAME_OR_PASSWORD);
} else {
session.getProtocolHandler().sendConnack(MQTTReasonCodes.NOT_AUTHORIZED_3);
@@ -314,7 +314,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handlePublish(MqttPublishMessage message) throws Exception {
- if (session.is5() && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5 && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
sendDisconnect(MQTTReasonCodes.PACKET_TOO_LARGE);
disconnect(true);
return;
@@ -353,7 +353,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader;
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
variableHeader = new MqttPubReplyMessageVariableHeader(messageId, reasonCode, MqttProperties.NO_PROPERTIES);
} else {
variableHeader = MqttMessageIdVariableHeader.from(messageId);
@@ -391,7 +391,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage unsubAck;
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
unsubAck = new MqttUnsubAckMessage(header, message.variableHeader(), new MqttUnsubAckPayload(reasonCodes));
} else {
unsubAck = new MqttUnsubAckMessage(header, message.variableHeader());
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 5a42d26..9d9ce1d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -72,6 +72,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private int maximumPacketSize = MQTTUtil.DEFAULT_MAXIMUM_PACKET_SIZE;
+ private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
+
private final MQTTRoutingHandler routingHandler;
MQTTProtocolManager(ActiveMQServer server,
@@ -128,6 +130,14 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
return this;
}
+ public boolean isCloseMqttConnectionOnPublishAuthorizationFailure() {
+ return closeMqttConnectionOnPublishAuthorizationFailure;
+ }
+
+ public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqttConnectionOnPublishAuthorizationFailure) {
+ this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
+ }
+
@Override
public void onNotification(Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 40d735b..17230a4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -84,8 +84,11 @@ public class MQTTPublishManager {
private MQTTSessionState.OutboundStore outboundStore;
- public MQTTPublishManager(MQTTSession session) {
+ private boolean closeMqttConnectionOnPublishAuthorizationFailure;
+
+ public MQTTPublishManager(MQTTSession session, boolean closeMqttConnectionOnPublishAuthorizationFailure) {
this.session = session;
+ this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
}
synchronized void start() {
@@ -173,7 +176,7 @@ public class MQTTPublishManager {
void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
synchronized (lock) {
String topic = message.variableHeader().topicName();
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
if (alias != null) {
@@ -222,18 +225,40 @@ public class MQTTPublishManager {
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
return;
- } else {
+ } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
/*
- * For MQTT 3.x clients:
+ * For MQTT 3.1.1 clients:
*
* [MQTT-3.3.5-2] If a Server implementation does not authorize a PUBLISH to be performed by a Client;
* it has no way of informing that Client. It MUST either make a positive acknowledgement, according
* to the normal QoS rules, or close the Network Connection
+ *
+ * Throwing an exception here will ultimately close the connection. This is the default behavior.
+ */
+ if (closeMqttConnectionOnPublishAuthorizationFailure) {
+ throw e;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MQTT 3.1.1 client not authorized to publish message.");
+ }
+ }
+ } else {
+ /*
+ * For MQTT 3.1 clients:
+ *
+ * Note that if a server implementation does not authorize a PUBLISH to be made by a client, it has no
+ * way of informing that client. It must therefore make a positive acknowledgement, according to the
+ * normal QoS rules, and the client will *not* be informed that it was not authorized to publish the
+ * message.
+ *
+ * Log the failure since we have to just swallow it.
*/
- throw e;
+ if (logger.isDebugEnabled()) {
+ logger.debug("MQTT 3.1 client not authorized to publish message.");
+ }
}
} catch (Throwable t) {
MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
@@ -365,7 +390,7 @@ public class MQTTPublishManager {
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
MqttProperties mqttProperties = getPublishProperties(message);
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) {
isRetain = false;
}
@@ -393,7 +418,7 @@ public class MQTTPublishManager {
MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);
int maxSize = session.getState().getClientMaxPacketSize();
- if (session.is5() && maxSize != 0) {
+ if (session.getVersion() == MQTTVersion.MQTT_5 && maxSize != 0) {
int size = MQTTUtil.calculateMessageSize(publish);
if (size > maxSize) {
/*
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index dfa904f..d8e7fb9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -65,7 +65,7 @@ public class MQTTSession {
private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
- private boolean five = false;
+ private MQTTVersion version = null;
private boolean usingServerKeepAlive = false;
@@ -80,7 +80,7 @@ public class MQTTSession {
this.connection = connection;
mqttConnectionManager = new MQTTConnectionManager(this);
- mqttPublishManager = new MQTTPublishManager(this);
+ mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
sessionCallback = new MQTTSessionCallback(this, connection);
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);
@@ -120,7 +120,7 @@ public class MQTTSession {
state.setDisconnectedTime(System.currentTimeMillis());
}
- if (is5()) {
+ if (getVersion() == MQTTVersion.MQTT_5) {
if (state.getClientSessionExpiryInterval() == 0) {
if (state.isWill() && failure) {
// If the session expires the will message must be sent no matter the will delay
@@ -234,12 +234,12 @@ public class MQTTSession {
return coreMessageObjectPools;
}
- public boolean is5() {
- return five;
+ public void setVersion(MQTTVersion version) {
+ this.version = version;
}
- public void set5(boolean five) {
- this.five = five;
+ public MQTTVersion getVersion() {
+ return this.version;
}
public boolean isUsingServerKeepAlive() {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 86ce69e..0eed8f6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -309,13 +309,30 @@ public class MQTTSubscriptionManager {
qos[i] = subscriptions.get(i).qualityOfService().value();
} catch (ActiveMQSecurityException e) {
// user is not authorized to create subsription
- if (session.is5()) {
+ if (session.getVersion() == MQTTVersion.MQTT_5) {
qos[i] = MQTTReasonCodes.NOT_AUTHORIZED;
- } else {
+ } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
qos[i] = MQTTReasonCodes.UNSPECIFIED_ERROR;
+ } else {
+ /*
+ * For MQTT 3.1 clients:
+ *
+ * Note that if a server implementation does not authorize a SUBSCRIBE request to be made by a client,
+ * it has no way of informing that client. It must therefore make a positive acknowledgement with a
+ * SUBACK, and the client will not be informed that it was not authorized to subscribe.
+ *
+ *
+ * For MQTT 3.1.1 clients:
+ *
+ * The 3.1.1 spec doesn't directly address the situation where the server does not authorize a
+ * SUBSCRIBE. It really just says this:
+ *
+ * [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond
+ * with a SUBACK Packet.
+ */
+ qos[i] = subscriptions.get(i).qualityOfService().value();
}
}
-
}
return qos;
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java
new file mode 100644
index 0000000..a810850
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+public enum MQTTVersion {
+
+ MQTT_3_1, MQTT_3_1_1, MQTT_5;
+
+ public int getVersion() {
+ switch (this) {
+ case MQTT_3_1:
+ return 3;
+ case MQTT_3_1_1:
+ return 4;
+ case MQTT_5:
+ return 5;
+ default:
+ return -1;
+ }
+ }
+
+ public static MQTTVersion getVersion(int version) {
+ switch (version) {
+ case 3:
+ return MQTT_3_1;
+ case 4:
+ return MQTT_3_1_1;
+ case 5:
+ return MQTT_5;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md
index d3d50be..6f1f280 100644
--- a/docs/user-manual/en/mqtt.md
+++ b/docs/user-manual/en/mqtt.md
@@ -262,4 +262,24 @@ response style authentication.
However, there are currently no challenge / response mechanisms implemented so if
a client passes the "Authentication Method" property in its `CONNECT` packet it will
receive a `CONNACK` with a reason code of `0x8C` (i.e. bad authentication method)
-and the network connection will be closed.
\ No newline at end of file
+and the network connection will be closed.
+
+## Publish Authorization Failures
+
+The MQTT 3.1.1 specification is ambiguous regarding the broker's behavior when
+a `PUBLISH` packet fails due to a lack of authorization. In [section 3.3.5](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718042)
+it says:
+
+> If a Server implementation does not authorize a PUBLISH to be performed by a
+> Client; it has no way of informing that Client. It MUST either make a positive
+> acknowledgement, according to the normal QoS rules, or close the Network
+> Connection
+
+By default the broker will close the network connection. However if you'd rather
+have the broker make a positive acknowledgement then set the URL parameter
+`closeMqttConnectionOnPublishAuthorizationFailure` to `false` on the relevant
+MQTT `acceptor` in `broker.xml`, e.g.:
+
+```xml
+<acceptor name="mqtt">tcp://0.0.0:1883?protocols=MQTT;closeMqttConnectionOnPublishAuthorizationFailure=false</acceptor>
+```
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
index 3938aab..bba711d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
@@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
+import java.io.EOFException;
import java.util.Arrays;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.junit.Test;
+import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME;
+
public class MQTTSecurityTest extends MQTTTestSupport {
@Override
@@ -77,4 +82,108 @@ public class MQTTSecurityTest extends MQTTTestSupport {
}
}
}
+
+ @Test(timeout = 30000)
+ public void testPublishAuthorizationFailOn311WithDisconnect() throws Exception {
+ String version = "3.1.1";
+
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(noprivUser);
+ mqtt.setPassword(noprivPass);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+ fail("Should have triggered an exception");
+ } catch (EOFException e) {
+ // OK
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Should not have caught an Exception");
+ } finally {
+ if (connection != null && connection.isConnected())
+ connection.disconnect();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testPublishAuthorizationFailOn311WithoutDisconnect() throws Exception {
+ setAcceptorProperty("closeMqttConnectionOnPublishAuthorizationFailure=false");
+ String version = "3.1.1";
+
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(noprivUser);
+ mqtt.setPassword(noprivPass);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+ assertTrue(connection.isConnected());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Should not have caught an Exception");
+ } finally {
+ if (connection != null && connection.isConnected())
+ connection.disconnect();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testPublishAuthorizationFailOn31() throws Exception {
+ String version = "3.1";
+
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(noprivUser);
+ mqtt.setPassword(noprivPass);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+ assertTrue(connection.isConnected());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Should not have caught an Exception");
+ } finally {
+ if (connection != null && connection.isConnected())
+ connection.disconnect();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testSubscribeAuthorizationFail() throws Exception {
+ for (String version : Arrays.asList("3.1", "3.1.1")) {
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(noprivUser);
+ mqtt.setPassword(noprivPass);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(new Topic[]{new Topic("foo", QoS.AT_MOST_ONCE)});
+ assertTrue(connection.isConnected());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Should not have caught an Exception");
+ } finally {
+ if (connection != null && connection.isConnected())
+ connection.disconnect();
+ }
+ }
+ }
+
+ protected void setAcceptorProperty(String property) throws Exception {
+ server.getRemotingService().getAcceptor(MQTT_PROTOCOL_NAME).stop();
+ server.getRemotingService().createAcceptor(MQTT_PROTOCOL_NAME, "tcp://localhost:" + port + "?protocols=MQTT;" + property).start();
+ }
}