You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/03/04 13:27:30 UTC

camel git commit: CAMEL-8432 MQTT wildcard ('+') subscription broken with thanks to Mark

Repository: camel
Updated Branches:
  refs/heads/master 38d51034a -> c6a5e030f


CAMEL-8432 MQTT wildcard ('+') subscription broken with thanks to Mark


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c6a5e030
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c6a5e030
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c6a5e030

Branch: refs/heads/master
Commit: c6a5e030ffa30edd28a3daa0e91cc745037cd0eb
Parents: 38d5103
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Mar 4 20:26:44 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Mar 4 20:27:16 2015 +0800

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTComponent.java     |  5 ++
 .../camel/component/mqtt/MQTTBaseTest.java      |  4 +-
 .../component/mqtt/MQTTConfigurationTest.java   | 12 ++++
 .../mqtt/MQTTConsumerWildcardTopicsTest.java    | 69 ++++++++++++++++++++
 4 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
index 7b03a83..6caa305 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
@@ -54,6 +54,11 @@ public class MQTTComponent extends UriEndpointComponent {
         return endpoint;
     }
 
+    @Override
+    public boolean useRawUri() {
+        return true; // to prevent MQTT "+" wildcard from being lost
+    }
+
     public String getHost() {
         return host;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
index 54bf0d8..f535de2 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
@@ -28,9 +28,11 @@ public abstract class MQTTBaseTest extends CamelTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(MQTTBaseTest.class);
     protected static final String TEST_TOPIC = "ComponentTestTopic";
     protected static final String TEST_TOPIC_2 = "AnotherTestTopic";
+    protected static final String TEST_WILDCARD_TOPIC = "base/+/#";
     protected static final String TEST_TOPICS = TEST_TOPIC + "," + TEST_TOPIC_2;
+    protected static final String TEST_TOPICS_WITH_WILDCARDS = TEST_TOPICS + "," + TEST_WILDCARD_TOPIC;
     protected BrokerService brokerService;
-    protected int numberOfMessages = 100;
+    protected int numberOfMessages = 10;
 
 
     public void setUp() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
index e66e6a1..bba2fa3 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
@@ -45,4 +45,16 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
         assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS);
         assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
     }
+
+    @Test
+    public void testWildcardSubscribeTopicsConfiguration() throws Exception {
+        Endpoint endpoint = context.getEndpoint("mqtt:todo?byDefaultRetain=true&qualityOfService=exactlyOnce&publishTopicName=" + TEST_TOPIC + "&subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS);
+        assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
+        MQTTEndpoint mqttEndpoint = (MQTTEndpoint) endpoint;
+
+        assertEquals(mqttEndpoint.getConfiguration().getQoS(), QoS.EXACTLY_ONCE);
+        assertEquals(mqttEndpoint.getConfiguration().getPublishTopicName(), TEST_TOPIC);
+        assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS_WITH_WILDCARDS);
+        assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
new file mode 100644
index 0000000..8337320
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.Test;
+
+public class MQTTConsumerWildcardTopicsTest extends MQTTBaseTest {
+
+    private static final String[] PUBLISH_TOPICS = {
+        TEST_TOPIC,
+        TEST_TOPIC_2,
+        "base",                    // doesn't match wildcard
+        "base/foo",                // matches
+        "base/foo/bar",            // matches
+        "base/bat/data/baz/splat"  // matches
+    };
+
+    @Test
+    public void testConsumeMultipleTopicsWithWildcards() throws Exception {
+        MQTT mqtt = new MQTT();
+        BlockingConnection publisherConnection = mqtt.blockingConnection();
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(numberOfMessages * (PUBLISH_TOPICS.length - 1));
+
+        publisherConnection.connect();
+        String payload;
+        for (String topic : PUBLISH_TOPICS) {
+            for (int i = 0; i < numberOfMessages; i++) {
+                payload = "Topic " + topic + ", Message " + i;
+                publisherConnection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+            }
+        }
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            public void configure() {
+                from("mqtt:bar?subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS)
+                    .transform(body().convertToString())
+                    .to("mock:result");
+            }
+        };
+    }
+}