You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/04/20 18:17:30 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5734

Repository: activemq
Updated Branches:
  refs/heads/master 4a821186a -> a4fbe7087


https://issues.apache.org/jira/browse/AMQ-5734

Support MQTT 3.1 silent subscription fail


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a4fbe708
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a4fbe708
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a4fbe708

Branch: refs/heads/master
Commit: a4fbe708726b4846fa9831e083f0fbf554b4f324
Parents: 4a82118
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Apr 20 18:15:20 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon Apr 20 18:17:09 2015 +0200

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  6 +++++
 .../AbstractMQTTSubscriptionStrategy.java       |  8 ++++++-
 .../activemq/transport/mqtt/MQTTAuthTest.java   | 24 +++++++++++++++++++-
 3 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 4e0b0df..37c0c4c 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -87,6 +87,8 @@ public class MQTTProtocolConverter {
     private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
 
     public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
+    public static final int V3_1 = 3;
+    public static final int V3_1_1 = 4;
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
@@ -119,6 +121,8 @@ public class MQTTProtocolConverter {
     private final MQTTPacketIdGenerator packetIdGenerator;
     private boolean publishDollarTopics;
 
+    public int version;
+
     private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
 
     /*
@@ -246,6 +250,8 @@ public class MQTTProtocolConverter {
             passswd = connect.password().toString();
         }
 
+        version = connect.version();
+
         configureInactivityMonitor(connect.keepAlive());
 
         connectionInfo.setConnectionId(connectionId);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
index 121b829..988a065 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -204,7 +204,13 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
                 if (response.isException()) {
                     final Throwable throwable = ((ExceptionResponse) response).getException();
                     LOG.warn("Error subscribing to {}", topicName, throwable);
-                    qos[0] = SUBSCRIBE_ERROR;
+                    // version 3.1 don't supports silent fail
+                    // version 3.1.1 send "error" qos
+                    if (protocol.version == protocol.V3_1_1) {
+                        qos[0] = SUBSCRIBE_ERROR;
+                    } else {
+                        qos[0] = (byte) qoS.ordinal();
+                    }
                 } else {
                     qos[0] = (byte) qoS.ordinal();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
index a8ced02..07a8cc9 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
@@ -138,8 +138,9 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setKeepAlive((short) 2);
+        mqtt.setVersion("3.1.1");
 
-        final BlockingConnection connection = mqtt.blockingConnection();
+        BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
         final String NAMED = "named";
@@ -163,7 +164,28 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
         assertEquals(ANONYMOUS, new String(msg.getPayload()));
         msg.ack();
 
+        //delete retained message
+        connection.publish(ANONYMOUS, "".getBytes(), QoS.AT_MOST_ONCE, true);
+
         connection.disconnect();
+
+        // Test 3.1 functionality
+        mqtt.setVersion("3.1");
+        connection = mqtt.blockingConnection();
+        connection.connect();
+        qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE) });
+        assertEquals(QoS.AT_MOST_ONCE.ordinal(), qos[0]);
+
+        MQTT mqttPub = createMQTTConnection("pub", true);
+        mqttPub.setUserName("admin");
+        mqttPub.setPassword("admin");
+
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        connectionPub.connect();
+        connectionPub.publish(NAMED, NAMED.getBytes(), QoS.AT_MOST_ONCE, true);
+
+        msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNull(msg);
     }
 
     @Test(timeout = 60 * 1000)