You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2016/04/15 11:00:48 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6253 - mqtt composite destinations support for virtual topic subscriptions

Repository: activemq
Updated Branches:
  refs/heads/master 19fd084a8 -> 6d20cba0e


https://issues.apache.org/jira/browse/AMQ-6253 - mqtt composite destinations support for virtual topic subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6d20cba0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6d20cba0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6d20cba0

Branch: refs/heads/master
Commit: 6d20cba0e4de8970da0236f2c772ef66c0f02661
Parents: 19fd084
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Apr 15 10:58:19 2016 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Apr 15 11:00:39 2016 +0200

----------------------------------------------------------------------
 .../MQTTVirtualTopicSubscriptionStrategy.java   | 19 +++++++++++--
 .../activemq/transport/mqtt/MQTTTest.java       | 29 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 434c248..f802991 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -182,10 +182,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
 
     @Override
     public ActiveMQDestination onSend(String topicName) {
+        ActiveMQTopic topic = new ActiveMQTopic(topicName);
+        if (topic.isComposite()) {
+           ActiveMQDestination[] composites = topic.getCompositeDestinations();
+           for (ActiveMQDestination composite : composites) {
+                composite.setPhysicalName(prefix(composite.getPhysicalName()));
+           }
+           ActiveMQTopic result = new ActiveMQTopic();
+           result.setCompositeDestinations(composites);
+           return result;
+        } else {
+          return new ActiveMQTopic(prefix(topicName));
+        }
+    }
+
+    private String prefix(String topicName) {
         if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
-            return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
+            return VIRTUALTOPIC_PREFIX + topicName;
         } else {
-            return new ActiveMQTopic(topicName);
+            return topicName;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index fc401e9..a84cf21 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -380,6 +380,35 @@ public class MQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 2 *  60 * 1000)
+    public void testMQTTCompositeDestinations() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("");
+        mqtt.setCleanSession(true);
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Topic[] topics = {new Topic(utf8("a/1"), QoS.values()[AT_MOST_ONCE]), new Topic(utf8("a/2"), QoS.values()[AT_MOST_ONCE])};
+        connection.subscribe(topics);
+
+        String payload = "Test Message";
+        String publishedTopic = "a/1,a/2";
+        connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
+
+        Message msg = connection.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals("a/2", msg.getTopic());
+
+        msg = connection.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals("a/1", msg.getTopic());
+
+        msg = connection.receive(1, TimeUnit.SECONDS);
+        assertNull(msg);
+
+    }
+
+    @Test(timeout = 2 *  60 * 1000)
     public void testMQTTPathPatterns() throws Exception {
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("");