You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/01 12:18:46 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5377 - mqtt wildcard conversion

Repository: activemq
Updated Branches:
  refs/heads/trunk 2d9475c4f -> fc3d90e8b


https://issues.apache.org/jira/browse/AMQ-5377 - mqtt wildcard conversion


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc3d90e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc3d90e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc3d90e8

Branch: refs/heads/trunk
Commit: fc3d90e8b740c1621ee1b51ac8d7ec6bec4c55fc
Parents: 2d9475c
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Oct 1 12:18:32 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Oct 1 12:18:32 2014 +0200

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  1 -
 .../transport/mqtt/MQTTProtocolSupport.java     | 26 +++++++++++++++++++-
 .../activemq/transport/mqtt/MQTTTest.java       | 20 +++++++++++++++
 3 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index d80b10a..c05c729 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -502,7 +502,6 @@ public class MQTTProtocolConverter {
             destination = activeMQDestinationMap.get(command.topicName());
             if (destination == null) {
                 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
-
                 try {
                     destination = findSubscriptionStrategy().onSend(topicName);
                 } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
index a30ed50..90f8644 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java
@@ -79,7 +79,31 @@ public class MQTTProtocolSupport {
      * @return a destination name formatted for MQTT.
      */
     public static String convertActiveMQToMQTT(String destinationName) {
-        return destinationName.replace('.', '/');
+        char[] chars = destinationName.toCharArray();
+        for (int i = 0; i < chars.length; i++) {
+            switch(chars[i]) {
+                case '>':
+                    chars[i] = '#';
+                    break;
+                case '#':
+                    chars[i] = '>';
+                    break;
+                case '*':
+                    chars[i] = '+';
+                    break;
+                case '+':
+                    chars[i] = '*';
+                    break;
+                case '.':
+                    chars[i] = '/';
+                    break;
+                case '/':
+                    chars[i] = '.';
+                    break;
+            }
+        }
+        String rc = new String(chars);
+        return rc;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc3d90e8/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3b4062d..1586ff4 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -330,6 +331,25 @@ public class MQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 2 *  60 * 1000)
+    public void testMQTTWildcard() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("");
+        mqtt.setCleanSession(true);
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Topic[] topics = {new Topic(utf8("a/#"), QoS.values()[AT_MOST_ONCE])};
+        connection.subscribe(topics);
+        String payload = "Test Message";
+        String publishedTopic = "a/b/1.2.3*4>";
+        connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
+
+        Message msg = connection.receive(1, TimeUnit.SECONDS);
+        assertEquals("Topic changed", publishedTopic, msg.getTopic());
+    }
+
+    @Test(timeout = 2 *  60 * 1000)
     public void testMQTTPathPatterns() throws Exception {
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("");