You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2016/04/15 11:00:48 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6253 -
mqtt composite destinations support for virtual topic subscriptions
Repository: activemq
Updated Branches:
refs/heads/master 19fd084a8 -> 6d20cba0e
https://issues.apache.org/jira/browse/AMQ-6253 - mqtt composite destinations support for virtual topic subscriptions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6d20cba0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6d20cba0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6d20cba0
Branch: refs/heads/master
Commit: 6d20cba0e4de8970da0236f2c772ef66c0f02661
Parents: 19fd084
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Apr 15 10:58:19 2016 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Apr 15 11:00:39 2016 +0200
----------------------------------------------------------------------
.../MQTTVirtualTopicSubscriptionStrategy.java | 19 +++++++++++--
.../activemq/transport/mqtt/MQTTTest.java | 29 ++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 434c248..f802991 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -182,10 +182,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
@Override
public ActiveMQDestination onSend(String topicName) {
+ ActiveMQTopic topic = new ActiveMQTopic(topicName);
+ if (topic.isComposite()) {
+ ActiveMQDestination[] composites = topic.getCompositeDestinations();
+ for (ActiveMQDestination composite : composites) {
+ composite.setPhysicalName(prefix(composite.getPhysicalName()));
+ }
+ ActiveMQTopic result = new ActiveMQTopic();
+ result.setCompositeDestinations(composites);
+ return result;
+ } else {
+ return new ActiveMQTopic(prefix(topicName));
+ }
+ }
+
+ private String prefix(String topicName) {
if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
- return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
+ return VIRTUALTOPIC_PREFIX + topicName;
} else {
- return new ActiveMQTopic(topicName);
+ return topicName;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6d20cba0/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index fc401e9..a84cf21 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -380,6 +380,35 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test(timeout = 2 * 60 * 1000)
+ public void testMQTTCompositeDestinations() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("");
+ mqtt.setCleanSession(true);
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Topic[] topics = {new Topic(utf8("a/1"), QoS.values()[AT_MOST_ONCE]), new Topic(utf8("a/2"), QoS.values()[AT_MOST_ONCE])};
+ connection.subscribe(topics);
+
+ String payload = "Test Message";
+ String publishedTopic = "a/1,a/2";
+ connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
+
+ Message msg = connection.receive(1, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals("a/2", msg.getTopic());
+
+ msg = connection.receive(1, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals("a/1", msg.getTopic());
+
+ msg = connection.receive(1, TimeUnit.SECONDS);
+ assertNull(msg);
+
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
public void testMQTTPathPatterns() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");