You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/03/02 18:32:07 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5883 https://issues.apache.org/jira/browse/AMQ-5884 https://issues.apache.org/jira/browse/AMQ-5885

Repository: activemq
Updated Branches:
  refs/heads/master 8ef44452a -> 5d6d42ce9


https://issues.apache.org/jira/browse/AMQ-5883
https://issues.apache.org/jira/browse/AMQ-5884
https://issues.apache.org/jira/browse/AMQ-5885

Add additional validation of Topic names used in subscribe and
unsubscriobe that test for spec compliance.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5d6d42ce
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5d6d42ce
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5d6d42ce

Branch: refs/heads/master
Commit: 5d6d42ce97a8a6cacd630574e54b330275d041a7
Parents: 8ef4445
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 2 12:30:54 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 2 12:30:54 2016 -0500

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  12 +-
 .../transport/mqtt/MQTTProtocolSupport.java     |  87 ++++++++
 .../activemq/transport/mqtt/MQTTTest.java       | 202 +++++++++++++++++++
 3 files changed, 298 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 97a74a9..154ec53 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -367,6 +367,7 @@ public class MQTTProtocolConverter {
         if (topics != null) {
             byte[] qos = new byte[topics.length];
             for (int i = 0; i < topics.length; i++) {
+                MQTTProtocolSupport.validate(topics[i].name().toString());
                 try {
                     qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
                 } catch (IOException e) {
@@ -383,6 +384,7 @@ public class MQTTProtocolConverter {
             }
         } else {
             LOG.warn("No topics defined for Subscription " + command);
+            throw new MQTTProtocolException("SUBSCRIBE command received with no topic filter");
         }
     }
 
@@ -394,16 +396,20 @@ public class MQTTProtocolConverter {
         UTF8Buffer[] topics = command.topics();
         if (topics != null) {
             for (UTF8Buffer topic : topics) {
+                MQTTProtocolSupport.validate(topic.toString());
                 try {
                     findSubscriptionStrategy().onUnSubscribe(topic.toString());
                 } catch (IOException e) {
                     throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
                 }
             }
+            UNSUBACK ack = new UNSUBACK();
+            ack.messageId(command.messageId());
+            sendToMQTT(ack.encode());
+        } else {
+            LOG.warn("No topics defined for Subscription " + command);
+            throw new MQTTProtocolException("UNSUBSCRIBE command received with no topic filter");
         }
-        UNSUBACK ack = new UNSUBACK();
-        ack.messageId(command.messageId());
-        sendToMQTT(ack.encode());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
index 90f8644..4a4d3c5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.io.UnsupportedEncodingException;
+
 import org.fusesource.mqtt.codec.CONNECT;
 import org.fusesource.mqtt.codec.DISCONNECT;
 import org.fusesource.mqtt.codec.PINGREQ;
@@ -32,6 +34,16 @@ import org.fusesource.mqtt.codec.UNSUBSCRIBE;
  */
 public class MQTTProtocolSupport {
 
+    private static final int TOPIC_NAME_MIN_LENGTH = 1;
+    private static final int TOPIC_NAME_MAX_LENGTH = 65535;
+
+    private static final String MULTI_LEVEL_WILDCARD = "#";
+    private static final String SINGLE_LEVEL_WILDCARD = "+";
+
+    private static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
+    private static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
+    private static final char TOPIC_LEVEL_SEPERATOR_CHAR = '/';
+
     /**
      * Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination
      * name string.
@@ -142,4 +154,79 @@ public class MQTTProtocolSupport {
                 return "UNKNOWN";
         }
     }
+
+    /**
+     * Validate that the Topic names given by client commands are valid
+     * based on the MQTT protocol specification.
+     *
+     * @param topicName
+     *      the given Topic name provided by the client.
+     *
+     * @throws MQTTProtocolException if the value given is invalid.
+     */
+    public static void validate(String topicName) throws MQTTProtocolException {
+        int topicLen = 0;
+        try {
+            topicLen = topicName.getBytes("UTF-8").length;
+        } catch (UnsupportedEncodingException e) {
+            throw new MQTTProtocolException("Topic name contained invalid UTF-8 encoding.");
+        }
+
+        // Spec: Unless stated otherwise all UTF-8 encoded strings can have any length in
+        //       the range 0 to 65535 bytes.
+        if (topicLen < TOPIC_NAME_MIN_LENGTH || topicLen > TOPIC_NAME_MAX_LENGTH) {
+            throw new MQTTProtocolException("Topic name given had invliad length.");
+        }
+
+        // 4.7.1.2 and 4.7.1.3 these can stand alone
+        if (MULTI_LEVEL_WILDCARD.equals(topicName) || SINGLE_LEVEL_WILDCARD.equals(topicName)) {
+            return;
+        }
+
+        // Spec: 4.7.1.2
+        //  The multi-level wildcard character MUST be specified either on its own or following a
+        //  topic level separator. In either case it MUST be the last character specified in the
+        //  Topic Filter [MQTT-4.7.1-2].
+        int numWildCards = 0;
+        for (int i = 0; i < topicName.length(); ++i) {
+            if (topicName.charAt(i) == MULTI_LEVEL_WILDCARD_CHAR) {
+                numWildCards++;
+
+                // If prev exists it must be a separator
+                if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+                    throw new MQTTProtocolException("The multi level wildcard must stand alone: " + topicName);
+                }
+            }
+
+            if (numWildCards > 1) {
+                throw new MQTTProtocolException("Topic Filter can only have one multi-level filter: " + topicName);
+            }
+        }
+
+        if (topicName.contains(MULTI_LEVEL_WILDCARD) && !topicName.endsWith(MULTI_LEVEL_WILDCARD)) {
+            throw new MQTTProtocolException("The multi-level filter must be at the end of the Topic name: " + topicName);
+        }
+
+        // Spec: 4.7.1.3
+        // The single-level wildcard can be used at any level in the Topic Filter, including
+        // first and last levels. Where it is used it MUST occupy an entire level of the filter
+        //
+        // [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be
+        // used in conjunction with the multilevel wildcard.
+        for (int i = 0; i < topicName.length(); ++i) {
+            if (topicName.charAt(i) != SINGLE_LEVEL_WILDCARD_CHAR) {
+                continue;
+            }
+
+            // If prev exists it must be a separator
+            if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+                throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
+            }
+
+            // If next exists it must be a separator
+            if (i < topicName.length() - 1 && topicName.charAt(i + 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+                throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index c89c15e..fc401e9 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1370,6 +1370,208 @@ public class MQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 30 * 10000)
+    public void testSubscribeWithZeroLengthTopic() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic topic = new Topic("", QoS.EXACTLY_ONCE);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        LOG.info("Trying to subscrobe to topic: {}", topic.name());
+
+        try {
+            connection.subscribe(new Topic[] { topic });
+            fail("Should not be able to subscribe with invalid Topic");
+        } catch (Exception ex) {
+            LOG.info("Caught expected error on subscribe");
+        }
+
+        assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !connection.isConnected();
+            }
+        }));
+    }
+
+    @Test(timeout = 30 * 10000)
+    public void testUnsubscribeWithZeroLengthTopic() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic topic = new Topic("", QoS.EXACTLY_ONCE);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        LOG.info("Trying to subscrobe to topic: {}", topic.name());
+
+        try {
+            connection.unsubscribe(new String[] { topic.name().toString() });
+            fail("Should not be able to subscribe with invalid Topic");
+        } catch (Exception ex) {
+            LOG.info("Caught expected error on subscribe");
+        }
+
+        assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !connection.isConnected();
+            }
+        }));
+    }
+
+    @Test(timeout = 30 * 10000)
+    public void testSubscribeWithInvalidMultiLevelWildcards() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE),
+                           new Topic("#/Foo/#", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/#/Level", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/X#", QoS.EXACTLY_ONCE) };
+
+        for (int i = 0; i < topics.length; ++i) {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+            try {
+                connection.subscribe(new Topic[] { topics[i] });
+                fail("Should not be able to subscribe with invalid Topic");
+            } catch (Exception ex) {
+                LOG.info("Caught expected error on subscribe");
+            }
+
+            assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !connection.isConnected();
+                }
+            }));
+        }
+    }
+
+    @Test(timeout = 30 * 10000)
+    public void testSubscribeWithInvalidSingleLevelWildcards() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE),
+                           new Topic("+Foo/#", QoS.EXACTLY_ONCE),
+                           new Topic("+#", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/+F", QoS.EXACTLY_ONCE) };
+
+        for (int i = 0; i < topics.length; ++i) {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+            try {
+                connection.subscribe(new Topic[] { topics[i] });
+                fail("Should not be able to subscribe with invalid Topic");
+            } catch (Exception ex) {
+                LOG.info("Caught expected error on subscribe");
+            }
+
+            assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !connection.isConnected();
+                }
+            }));
+        }
+    }
+
+    @Test(timeout = 30 * 10000)
+    public void testUnsubscribeWithInvalidMultiLevelWildcards() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE),
+                           new Topic("#/Foo/#", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/#/Level", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/X#", QoS.EXACTLY_ONCE) };
+
+        for (int i = 0; i < topics.length; ++i) {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+            try {
+                connection.unsubscribe(new String[] { topics[i].name().toString() });
+                fail("Should not be able to unsubscribe with invalid Topic");
+            } catch (Exception ex) {
+                LOG.info("Caught expected error on subscribe");
+            }
+
+            assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !connection.isConnected();
+                }
+            }));
+        }
+    }
+
+    @Test(timeout = 30 * 10000)
+    public void testUnsubscribeWithInvalidSingleLevelWildcards() throws Exception {
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE),
+                           new Topic("+Foo/#", QoS.EXACTLY_ONCE),
+                           new Topic("+#", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE),
+                           new Topic("Foo/+F", QoS.EXACTLY_ONCE) };
+
+        for (int i = 0; i < topics.length; ++i) {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+            try {
+                connection.unsubscribe(new String[] { topics[i].name().toString() });
+                fail("Should not be able to unsubscribe with invalid Topic");
+            } catch (Exception ex) {
+                LOG.info("Caught expected error on subscribe");
+            }
+
+            assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !connection.isConnected();
+                }
+            }));
+        }
+    }
+
+    @Test(timeout = 30 * 10000)
     public void testSubscribeMultipleTopics() throws Exception {
 
         byte[] payload = new byte[1024 * 32];