You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/15 19:34:00 UTC

[activemq-artemis] branch main updated: ARTEMIS-3702 auth failures don't adhere to MQTT spec

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b5a25a  ARTEMIS-3702 auth failures don't adhere to MQTT spec
2b5a25a is described below

commit 2b5a25a10611366fd4f643587efcf2292b0f548f
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Mar 3 22:04:20 2022 -0600

    ARTEMIS-3702 auth failures don't adhere to MQTT spec
    
    The commit includes the following changes:
     - Don't drop the connection on subscribe or publish authorization
    failures for 3.1 clients.
     - Don't drop the connection on subscribe authorization failures for
    3.1.1 clients.
     - Add configuration parameter to control behavior on publish
    authorization failures for 3.1.1 clients (either disconnect or not).
---
 .../core/protocol/mqtt/MQTTConnectionManager.java  |  11 +--
 .../core/protocol/mqtt/MQTTProtocolHandler.java    |  14 +--
 .../core/protocol/mqtt/MQTTProtocolManager.java    |  10 ++
 .../core/protocol/mqtt/MQTTPublishManager.java     |  41 ++++++--
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  14 +--
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  23 ++++-
 .../artemis/core/protocol/mqtt/MQTTVersion.java    |  49 +++++++++
 docs/user-manual/en/mqtt.md                        |  22 ++++-
 .../tests/integration/mqtt/MQTTSecurityTest.java   | 109 +++++++++++++++++++++
 9 files changed, 261 insertions(+), 32 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index c92e8c9..d56e18f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -58,9 +58,8 @@ public class MQTTConnectionManager {
    }
 
    void connect(MqttConnectMessage connect, String validatedUser) throws Exception {
-      int packetVersion = connect.variableHeader().version();
-      if (packetVersion == MqttVersion.MQTT_5.protocolLevel()) {
-         session.set5(true);
+      session.setVersion(MQTTVersion.getVersion(connect.variableHeader().version()));
+      if (session.getVersion() == MQTTVersion.MQTT_5) {
          session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
          String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);
 
@@ -121,7 +120,7 @@ public class MQTTConnectionManager {
             session.getState().setWillRetain(connect.variableHeader().isWillRetain());
             session.getState().setWillTopic(connect.payload().willTopic());
 
-            if (session.is5()) {
+            if (session.getVersion() == MQTTVersion.MQTT_5) {
                MqttProperties willProperties = connect.payload().willProperties();
                if (willProperties != null) {
                   MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
@@ -133,7 +132,7 @@ public class MQTTConnectionManager {
          }
 
          MqttProperties connackProperties;
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
 
             sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
@@ -241,7 +240,7 @@ public class MQTTConnectionManager {
 
          if (connection != null) {
             MQTTSession existingSession = session.getProtocolManager().getSessionState(clientId).getSession();
-            if (session.is5()) {
+            if (session.getVersion() == MQTTVersion.MQTT_5) {
                existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
             }
             // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 6e9d6bf..488d8bd 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -99,7 +99,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttMessage message = (MqttMessage) msg;
 
       if (stopped) {
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
          }
          disconnect(true);
@@ -109,7 +109,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       // Disconnect if Netty codec failed to decode the stream.
       if (message.decoderResult().isFailure()) {
          logger.debugf(message.decoderResult().cause(), "Disconnecting client due to message decoding failure.");
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             sendDisconnect(MQTTReasonCodes.MALFORMED_PACKET);
          }
          disconnect(true);
@@ -186,7 +186,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
          }
       } catch (Exception e) {
          MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
          }
          disconnect(true);
@@ -232,7 +232,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       try {
          validatedUser = session.getServer().validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain());
       } catch (ActiveMQSecurityException e) {
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             session.getProtocolHandler().sendConnack(MQTTReasonCodes.BAD_USER_NAME_OR_PASSWORD);
          } else {
             session.getProtocolHandler().sendConnack(MQTTReasonCodes.NOT_AUTHORIZED_3);
@@ -314,7 +314,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    void handlePublish(MqttPublishMessage message) throws Exception {
-      if (session.is5() && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
+      if (session.getVersion() == MQTTVersion.MQTT_5 && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
          sendDisconnect(MQTTReasonCodes.PACKET_TOO_LARGE);
          disconnect(true);
          return;
@@ -353,7 +353,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE, false, 0);
 
       MqttMessageIdVariableHeader variableHeader;
-      if (session.is5()) {
+      if (session.getVersion() == MQTTVersion.MQTT_5) {
          variableHeader = new MqttPubReplyMessageVariableHeader(messageId, reasonCode, MqttProperties.NO_PROPERTIES);
       } else {
          variableHeader = MqttMessageIdVariableHeader.from(messageId);
@@ -391,7 +391,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
       MqttUnsubAckMessage unsubAck;
-      if (session.is5()) {
+      if (session.getVersion() == MQTTVersion.MQTT_5) {
          unsubAck = new MqttUnsubAckMessage(header, message.variableHeader(), new MqttUnsubAckPayload(reasonCodes));
       } else {
          unsubAck = new MqttUnsubAckMessage(header, message.variableHeader());
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 5a42d26..9d9ce1d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -72,6 +72,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
 
    private int maximumPacketSize = MQTTUtil.DEFAULT_MAXIMUM_PACKET_SIZE;
 
+   private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
+
    private final MQTTRoutingHandler routingHandler;
 
    MQTTProtocolManager(ActiveMQServer server,
@@ -128,6 +130,14 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
       return this;
    }
 
+   public boolean isCloseMqttConnectionOnPublishAuthorizationFailure() {
+      return closeMqttConnectionOnPublishAuthorizationFailure;
+   }
+
+   public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqttConnectionOnPublishAuthorizationFailure) {
+      this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
+   }
+
    @Override
    public void onNotification(Notification notification) {
       if (!(notification.getType() instanceof CoreNotificationType))
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 40d735b..17230a4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -84,8 +84,11 @@ public class MQTTPublishManager {
 
    private MQTTSessionState.OutboundStore outboundStore;
 
-   public MQTTPublishManager(MQTTSession session) {
+   private boolean closeMqttConnectionOnPublishAuthorizationFailure;
+
+   public MQTTPublishManager(MQTTSession session, boolean closeMqttConnectionOnPublishAuthorizationFailure) {
       this.session = session;
+      this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
    }
 
    synchronized void start() {
@@ -173,7 +176,7 @@ public class MQTTPublishManager {
    void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
       synchronized (lock) {
          String topic = message.variableHeader().topicName();
-         if (session.is5()) {
+         if (session.getVersion() == MQTTVersion.MQTT_5) {
             Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
             Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
             if (alias != null) {
@@ -222,18 +225,40 @@ public class MQTTPublishManager {
                tx.commit();
             } catch (ActiveMQSecurityException e) {
                tx.rollback();
-               if (session.is5()) {
+               if (session.getVersion() == MQTTVersion.MQTT_5) {
                   sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
                   return;
-               } else {
+               } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
                   /*
-                   * For MQTT 3.x clients:
+                   * For MQTT 3.1.1 clients:
                    *
                    * [MQTT-3.3.5-2] If a Server implementation does not authorize a PUBLISH to be performed by a Client;
                    * it has no way of informing that Client. It MUST either make a positive acknowledgement, according
                    * to the normal QoS rules, or close the Network Connection
+                   *
+                   * Throwing an exception here will ultimately close the connection. This is the default behavior.
+                   */
+                  if (closeMqttConnectionOnPublishAuthorizationFailure) {
+                     throw e;
+                  } else {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("MQTT 3.1.1 client not authorized to publish message.");
+                     }
+                  }
+               } else {
+                  /*
+                   * For MQTT 3.1 clients:
+                   *
+                   * Note that if a server implementation does not authorize a PUBLISH to be made by a client, it has no
+                   * way of informing that client. It must therefore make a positive acknowledgement, according to the
+                   * normal QoS rules, and the client will *not* be informed that it was not authorized to publish the
+                   * message.
+                   *
+                   * Log the failure since we have to just swallow it.
                    */
-                  throw e;
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("MQTT 3.1 client not authorized to publish message.");
+                  }
                }
             } catch (Throwable t) {
                MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
@@ -365,7 +390,7 @@ public class MQTTPublishManager {
       boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
       MqttProperties mqttProperties = getPublishProperties(message);
 
-      if (session.is5()) {
+      if (session.getVersion() == MQTTVersion.MQTT_5) {
          if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) {
             isRetain = false;
          }
@@ -393,7 +418,7 @@ public class MQTTPublishManager {
       MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);
 
       int maxSize = session.getState().getClientMaxPacketSize();
-      if (session.is5() && maxSize != 0) {
+      if (session.getVersion() == MQTTVersion.MQTT_5 && maxSize != 0) {
          int size = MQTTUtil.calculateMessageSize(publish);
          if (size > maxSize) {
             /*
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index dfa904f..d8e7fb9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -65,7 +65,7 @@ public class MQTTSession {
 
    private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
-   private boolean five = false;
+   private MQTTVersion version = null;
 
    private boolean usingServerKeepAlive = false;
 
@@ -80,7 +80,7 @@ public class MQTTSession {
       this.connection = connection;
 
       mqttConnectionManager = new MQTTConnectionManager(this);
-      mqttPublishManager = new MQTTPublishManager(this);
+      mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
       sessionCallback = new MQTTSessionCallback(this, connection);
       subscriptionManager = new MQTTSubscriptionManager(this);
       retainMessageManager = new MQTTRetainMessageManager(this);
@@ -120,7 +120,7 @@ public class MQTTSession {
             state.setDisconnectedTime(System.currentTimeMillis());
          }
 
-         if (is5()) {
+         if (getVersion() == MQTTVersion.MQTT_5) {
             if (state.getClientSessionExpiryInterval() == 0) {
                if (state.isWill() && failure) {
                   // If the session expires the will message must be sent no matter the will delay
@@ -234,12 +234,12 @@ public class MQTTSession {
       return coreMessageObjectPools;
    }
 
-   public boolean is5() {
-      return five;
+   public void setVersion(MQTTVersion version) {
+      this.version = version;
    }
 
-   public void set5(boolean five) {
-      this.five = five;
+   public MQTTVersion getVersion() {
+      return this.version;
    }
 
    public boolean isUsingServerKeepAlive() {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 86ce69e..0eed8f6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -309,13 +309,30 @@ public class MQTTSubscriptionManager {
                qos[i] = subscriptions.get(i).qualityOfService().value();
             } catch (ActiveMQSecurityException e) {
                // user is not authorized to create subsription
-               if (session.is5()) {
+               if (session.getVersion() == MQTTVersion.MQTT_5) {
                   qos[i] = MQTTReasonCodes.NOT_AUTHORIZED;
-               } else {
+               } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
                   qos[i] = MQTTReasonCodes.UNSPECIFIED_ERROR;
+               } else {
+                  /*
+                   * For MQTT 3.1 clients:
+                   *
+                   * Note that if a server implementation does not authorize a SUBSCRIBE request to be made by a client,
+                   * it has no way of informing that client. It must therefore make a positive acknowledgement with a
+                   * SUBACK, and the client will not be informed that it was not authorized to subscribe.
+                   *
+                   *
+                   * For MQTT 3.1.1 clients:
+                   *
+                   * The 3.1.1 spec doesn't directly address the situation where the server does not authorize a
+                   * SUBSCRIBE. It really just says this:
+                   *
+                   * [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond
+                   *  with a SUBACK Packet.
+                   */
+                  qos[i] = subscriptions.get(i).qualityOfService().value();
                }
             }
-
          }
          return qos;
       }
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java
new file mode 100644
index 0000000..a810850
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTVersion.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+public enum MQTTVersion {
+
+   MQTT_3_1, MQTT_3_1_1, MQTT_5;
+
+   public int getVersion() {
+      switch (this) {
+         case MQTT_3_1:
+            return 3;
+         case MQTT_3_1_1:
+            return 4;
+         case MQTT_5:
+            return 5;
+         default:
+            return -1;
+      }
+   }
+
+   public static MQTTVersion getVersion(int version) {
+      switch (version) {
+         case 3:
+            return MQTT_3_1;
+         case 4:
+            return MQTT_3_1_1;
+         case 5:
+            return MQTT_5;
+         default:
+            return null;
+      }
+   }
+}
diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md
index d3d50be..6f1f280 100644
--- a/docs/user-manual/en/mqtt.md
+++ b/docs/user-manual/en/mqtt.md
@@ -262,4 +262,24 @@ response style authentication.
 However, there are currently no challenge / response mechanisms implemented so if
 a client passes the "Authentication Method" property in its `CONNECT` packet it will
 receive a `CONNACK` with a reason code of `0x8C` (i.e. bad authentication method)
-and the network connection will be closed.
\ No newline at end of file
+and the network connection will be closed.
+
+## Publish Authorization Failures
+
+The MQTT 3.1.1 specification is ambiguous regarding the broker's behavior when
+a `PUBLISH` packet fails due to a lack of authorization. In [section 3.3.5](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718042)
+it says:
+
+> If a Server implementation does not authorize a PUBLISH to be performed by a
+> Client; it has no way of informing that Client. It MUST either make a positive
+> acknowledgement, according to the normal QoS rules, or close the Network
+> Connection
+
+By default the broker will close the network connection. However if you'd rather
+have the broker make a positive acknowledgement then set the URL parameter
+`closeMqttConnectionOnPublishAuthorizationFailure` to `false` on the relevant
+MQTT `acceptor` in `broker.xml`, e.g.:
+
+```xml
+<acceptor name="mqtt">tcp://0.0.0:1883?protocols=MQTT;closeMqttConnectionOnPublishAuthorizationFailure=false</acceptor>
+```
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
index 3938aab..bba711d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityTest.java
@@ -16,15 +16,20 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt;
 
+import java.io.EOFException;
 import java.util.Arrays;
 
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.MQTTException;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.codec.CONNACK;
 import org.junit.Test;
 
+import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME;
+
 public class MQTTSecurityTest extends MQTTTestSupport {
 
    @Override
@@ -77,4 +82,108 @@ public class MQTTSecurityTest extends MQTTTestSupport {
          }
       }
    }
+
+   @Test(timeout = 30000)
+   public void testPublishAuthorizationFailOn311WithDisconnect() throws Exception {
+      String version = "3.1.1";
+
+      BlockingConnection connection = null;
+      try {
+         MQTT mqtt = createMQTTConnection("test-" + version, true);
+         mqtt.setUserName(noprivUser);
+         mqtt.setPassword(noprivPass);
+         mqtt.setConnectAttemptsMax(1);
+         mqtt.setVersion(version);
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+         fail("Should have triggered an exception");
+      } catch (EOFException e) {
+         // OK
+      } catch (Exception e) {
+         e.printStackTrace();
+         fail("Should not have caught an Exception");
+      } finally {
+         if (connection != null && connection.isConnected())
+            connection.disconnect();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testPublishAuthorizationFailOn311WithoutDisconnect() throws Exception {
+      setAcceptorProperty("closeMqttConnectionOnPublishAuthorizationFailure=false");
+      String version = "3.1.1";
+
+      BlockingConnection connection = null;
+      try {
+         MQTT mqtt = createMQTTConnection("test-" + version, true);
+         mqtt.setUserName(noprivUser);
+         mqtt.setPassword(noprivPass);
+         mqtt.setConnectAttemptsMax(1);
+         mqtt.setVersion(version);
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+         assertTrue(connection.isConnected());
+      } catch (Exception e) {
+         e.printStackTrace();
+         fail("Should not have caught an Exception");
+      } finally {
+         if (connection != null && connection.isConnected())
+            connection.disconnect();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testPublishAuthorizationFailOn31() throws Exception {
+      String version = "3.1";
+
+      BlockingConnection connection = null;
+      try {
+         MQTT mqtt = createMQTTConnection("test-" + version, true);
+         mqtt.setUserName(noprivUser);
+         mqtt.setPassword(noprivPass);
+         mqtt.setConnectAttemptsMax(1);
+         mqtt.setVersion(version);
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.publish("foo", new byte[0], QoS.EXACTLY_ONCE, false);
+         assertTrue(connection.isConnected());
+      } catch (Exception e) {
+         e.printStackTrace();
+         fail("Should not have caught an Exception");
+      } finally {
+         if (connection != null && connection.isConnected())
+            connection.disconnect();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testSubscribeAuthorizationFail() throws Exception {
+      for (String version : Arrays.asList("3.1", "3.1.1")) {
+         BlockingConnection connection = null;
+         try {
+            MQTT mqtt = createMQTTConnection("test-" + version, true);
+            mqtt.setUserName(noprivUser);
+            mqtt.setPassword(noprivPass);
+            mqtt.setConnectAttemptsMax(1);
+            mqtt.setVersion(version);
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.subscribe(new Topic[]{new Topic("foo", QoS.AT_MOST_ONCE)});
+            assertTrue(connection.isConnected());
+         } catch (Exception e) {
+            e.printStackTrace();
+            fail("Should not have caught an Exception");
+         } finally {
+            if (connection != null && connection.isConnected())
+               connection.disconnect();
+         }
+      }
+   }
+
+   protected void setAcceptorProperty(String property) throws Exception {
+      server.getRemotingService().getAcceptor(MQTT_PROTOCOL_NAME).stop();
+      server.getRemotingService().createAcceptor(MQTT_PROTOCOL_NAME, "tcp://localhost:" + port + "?protocols=MQTT;" + property).start();
+   }
 }