You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/03/02 18:32:07 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5883
https://issues.apache.org/jira/browse/AMQ-5884
https://issues.apache.org/jira/browse/AMQ-5885
Repository: activemq
Updated Branches:
refs/heads/master 8ef44452a -> 5d6d42ce9
https://issues.apache.org/jira/browse/AMQ-5883
https://issues.apache.org/jira/browse/AMQ-5884
https://issues.apache.org/jira/browse/AMQ-5885
Add additional validation of Topic names used in subscribe and
unsubscriobe that test for spec compliance.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5d6d42ce
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5d6d42ce
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5d6d42ce
Branch: refs/heads/master
Commit: 5d6d42ce97a8a6cacd630574e54b330275d041a7
Parents: 8ef4445
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 2 12:30:54 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 2 12:30:54 2016 -0500
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 12 +-
.../transport/mqtt/MQTTProtocolSupport.java | 87 ++++++++
.../activemq/transport/mqtt/MQTTTest.java | 202 +++++++++++++++++++
3 files changed, 298 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 97a74a9..154ec53 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -367,6 +367,7 @@ public class MQTTProtocolConverter {
if (topics != null) {
byte[] qos = new byte[topics.length];
for (int i = 0; i < topics.length; i++) {
+ MQTTProtocolSupport.validate(topics[i].name().toString());
try {
qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
} catch (IOException e) {
@@ -383,6 +384,7 @@ public class MQTTProtocolConverter {
}
} else {
LOG.warn("No topics defined for Subscription " + command);
+ throw new MQTTProtocolException("SUBSCRIBE command received with no topic filter");
}
}
@@ -394,16 +396,20 @@ public class MQTTProtocolConverter {
UTF8Buffer[] topics = command.topics();
if (topics != null) {
for (UTF8Buffer topic : topics) {
+ MQTTProtocolSupport.validate(topic.toString());
try {
findSubscriptionStrategy().onUnSubscribe(topic.toString());
} catch (IOException e) {
throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
}
}
+ UNSUBACK ack = new UNSUBACK();
+ ack.messageId(command.messageId());
+ sendToMQTT(ack.encode());
+ } else {
+ LOG.warn("No topics defined for Subscription " + command);
+ throw new MQTTProtocolException("UNSUBSCRIBE command received with no topic filter");
}
- UNSUBACK ack = new UNSUBACK();
- ack.messageId(command.messageId());
- sendToMQTT(ack.encode());
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
index 90f8644..4a4d3c5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.mqtt;
+import java.io.UnsupportedEncodingException;
+
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.PINGREQ;
@@ -32,6 +34,16 @@ import org.fusesource.mqtt.codec.UNSUBSCRIBE;
*/
public class MQTTProtocolSupport {
+ private static final int TOPIC_NAME_MIN_LENGTH = 1;
+ private static final int TOPIC_NAME_MAX_LENGTH = 65535;
+
+ private static final String MULTI_LEVEL_WILDCARD = "#";
+ private static final String SINGLE_LEVEL_WILDCARD = "+";
+
+ private static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
+ private static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
+ private static final char TOPIC_LEVEL_SEPERATOR_CHAR = '/';
+
/**
* Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination
* name string.
@@ -142,4 +154,79 @@ public class MQTTProtocolSupport {
return "UNKNOWN";
}
}
+
+ /**
+ * Validate that the Topic names given by client commands are valid
+ * based on the MQTT protocol specification.
+ *
+ * @param topicName
+ * the given Topic name provided by the client.
+ *
+ * @throws MQTTProtocolException if the value given is invalid.
+ */
+ public static void validate(String topicName) throws MQTTProtocolException {
+ int topicLen = 0;
+ try {
+ topicLen = topicName.getBytes("UTF-8").length;
+ } catch (UnsupportedEncodingException e) {
+ throw new MQTTProtocolException("Topic name contained invalid UTF-8 encoding.");
+ }
+
+ // Spec: Unless stated otherwise all UTF-8 encoded strings can have any length in
+ // the range 0 to 65535 bytes.
+ if (topicLen < TOPIC_NAME_MIN_LENGTH || topicLen > TOPIC_NAME_MAX_LENGTH) {
+ throw new MQTTProtocolException("Topic name given had invliad length.");
+ }
+
+ // 4.7.1.2 and 4.7.1.3 these can stand alone
+ if (MULTI_LEVEL_WILDCARD.equals(topicName) || SINGLE_LEVEL_WILDCARD.equals(topicName)) {
+ return;
+ }
+
+ // Spec: 4.7.1.2
+ // The multi-level wildcard character MUST be specified either on its own or following a
+ // topic level separator. In either case it MUST be the last character specified in the
+ // Topic Filter [MQTT-4.7.1-2].
+ int numWildCards = 0;
+ for (int i = 0; i < topicName.length(); ++i) {
+ if (topicName.charAt(i) == MULTI_LEVEL_WILDCARD_CHAR) {
+ numWildCards++;
+
+ // If prev exists it must be a separator
+ if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+ throw new MQTTProtocolException("The multi level wildcard must stand alone: " + topicName);
+ }
+ }
+
+ if (numWildCards > 1) {
+ throw new MQTTProtocolException("Topic Filter can only have one multi-level filter: " + topicName);
+ }
+ }
+
+ if (topicName.contains(MULTI_LEVEL_WILDCARD) && !topicName.endsWith(MULTI_LEVEL_WILDCARD)) {
+ throw new MQTTProtocolException("The multi-level filter must be at the end of the Topic name: " + topicName);
+ }
+
+ // Spec: 4.7.1.3
+ // The single-level wildcard can be used at any level in the Topic Filter, including
+ // first and last levels. Where it is used it MUST occupy an entire level of the filter
+ //
+ // [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be
+ // used in conjunction with the multilevel wildcard.
+ for (int i = 0; i < topicName.length(); ++i) {
+ if (topicName.charAt(i) != SINGLE_LEVEL_WILDCARD_CHAR) {
+ continue;
+ }
+
+ // If prev exists it must be a separator
+ if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+ throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
+ }
+
+ // If next exists it must be a separator
+ if (i < topicName.length() - 1 && topicName.charAt(i + 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
+ throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index c89c15e..fc401e9 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1370,6 +1370,208 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test(timeout = 30 * 10000)
+ public void testSubscribeWithZeroLengthTopic() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic topic = new Topic("", QoS.EXACTLY_ONCE);
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topic.name());
+
+ try {
+ connection.subscribe(new Topic[] { topic });
+ fail("Should not be able to subscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+
+ @Test(timeout = 30 * 10000)
+ public void testUnsubscribeWithZeroLengthTopic() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic topic = new Topic("", QoS.EXACTLY_ONCE);
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topic.name());
+
+ try {
+ connection.unsubscribe(new String[] { topic.name().toString() });
+ fail("Should not be able to subscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+
+ @Test(timeout = 30 * 10000)
+ public void testSubscribeWithInvalidMultiLevelWildcards() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE),
+ new Topic("#/Foo/#", QoS.EXACTLY_ONCE),
+ new Topic("Foo/#/Level", QoS.EXACTLY_ONCE),
+ new Topic("Foo/X#", QoS.EXACTLY_ONCE) };
+
+ for (int i = 0; i < topics.length; ++i) {
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+ try {
+ connection.subscribe(new Topic[] { topics[i] });
+ fail("Should not be able to subscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+ }
+
+ @Test(timeout = 30 * 10000)
+ public void testSubscribeWithInvalidSingleLevelWildcards() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE),
+ new Topic("+Foo/#", QoS.EXACTLY_ONCE),
+ new Topic("+#", QoS.EXACTLY_ONCE),
+ new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE),
+ new Topic("Foo/+F", QoS.EXACTLY_ONCE) };
+
+ for (int i = 0; i < topics.length; ++i) {
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+ try {
+ connection.subscribe(new Topic[] { topics[i] });
+ fail("Should not be able to subscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+ }
+
+ @Test(timeout = 30 * 10000)
+ public void testUnsubscribeWithInvalidMultiLevelWildcards() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE),
+ new Topic("#/Foo/#", QoS.EXACTLY_ONCE),
+ new Topic("Foo/#/Level", QoS.EXACTLY_ONCE),
+ new Topic("Foo/X#", QoS.EXACTLY_ONCE) };
+
+ for (int i = 0; i < topics.length; ++i) {
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+ try {
+ connection.unsubscribe(new String[] { topics[i].name().toString() });
+ fail("Should not be able to unsubscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+ }
+
+ @Test(timeout = 30 * 10000)
+ public void testUnsubscribeWithInvalidSingleLevelWildcards() throws Exception {
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("MQTT-Client");
+ mqtt.setCleanSession(false);
+
+ Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE),
+ new Topic("+Foo/#", QoS.EXACTLY_ONCE),
+ new Topic("+#", QoS.EXACTLY_ONCE),
+ new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE),
+ new Topic("Foo/+F", QoS.EXACTLY_ONCE) };
+
+ for (int i = 0; i < topics.length; ++i) {
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ LOG.info("Trying to subscrobe to topic: {}", topics[i].name());
+
+ try {
+ connection.unsubscribe(new String[] { topics[i].name().toString() });
+ fail("Should not be able to unsubscribe with invalid Topic");
+ } catch (Exception ex) {
+ LOG.info("Caught expected error on subscribe");
+ }
+
+ assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !connection.isConnected();
+ }
+ }));
+ }
+ }
+
+ @Test(timeout = 30 * 10000)
public void testSubscribeMultipleTopics() throws Exception {
byte[] payload = new byte[1024 * 32];