You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/01 12:18:46 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5377 - mqtt
wildcard conversion
Repository: activemq
Updated Branches:
refs/heads/trunk 2d9475c4f -> fc3d90e8b
https://issues.apache.org/jira/browse/AMQ-5377 - mqtt wildcard conversion
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc3d90e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc3d90e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc3d90e8
Branch: refs/heads/trunk
Commit: fc3d90e8b740c1621ee1b51ac8d7ec6bec4c55fc
Parents: 2d9475c
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Oct 1 12:18:32 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Oct 1 12:18:32 2014 +0200
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 1 -
.../transport/mqtt/MQTTProtocolSupport.java | 26 +++++++++++++++++++-
.../activemq/transport/mqtt/MQTTTest.java | 20 +++++++++++++++
3 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index d80b10a..c05c729 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -502,7 +502,6 @@ public class MQTTProtocolConverter {
destination = activeMQDestinationMap.get(command.topicName());
if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
-
try {
destination = findSubscriptionStrategy().onSend(topicName);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
index a30ed50..90f8644 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
@@ -79,7 +79,31 @@ public class MQTTProtocolSupport {
* @return a destination name formatted for MQTT.
*/
public static String convertActiveMQToMQTT(String destinationName) {
- return destinationName.replace('.', '/');
+ char[] chars = destinationName.toCharArray();
+ for (int i = 0; i < chars.length; i++) {
+ switch(chars[i]) {
+ case '>':
+ chars[i] = '#';
+ break;
+ case '#':
+ chars[i] = '>';
+ break;
+ case '*':
+ chars[i] = '+';
+ break;
+ case '+':
+ chars[i] = '*';
+ break;
+ case '.':
+ chars[i] = '/';
+ break;
+ case '/':
+ chars[i] = '.';
+ break;
+ }
+ }
+ String rc = new String(chars);
+ return rc;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3b4062d..1586ff4 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.mqtt;
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -330,6 +331,25 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test(timeout = 2 * 60 * 1000)
+ public void testMQTTWildcard() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("");
+ mqtt.setCleanSession(true);
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Topic[] topics = {new Topic(utf8("a/#"), QoS.values()[AT_MOST_ONCE])};
+ connection.subscribe(topics);
+ String payload = "Test Message";
+ String publishedTopic = "a/b/1.2.3*4>";
+ connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
+
+ Message msg = connection.receive(1, TimeUnit.SECONDS);
+ assertEquals("Topic changed", publishedTopic, msg.getTopic());
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
public void testMQTTPathPatterns() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");