You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/03/04 13:27:30 UTC
camel git commit: CAMEL-8432 MQTT wildcard ('+') subscription broken
with thanks to Mark
Repository: camel
Updated Branches:
refs/heads/master 38d51034a -> c6a5e030f
CAMEL-8432 MQTT wildcard ('+') subscription broken with thanks to Mark
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c6a5e030
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c6a5e030
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c6a5e030
Branch: refs/heads/master
Commit: c6a5e030ffa30edd28a3daa0e91cc745037cd0eb
Parents: 38d5103
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Mar 4 20:26:44 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Mar 4 20:27:16 2015 +0800
----------------------------------------------------------------------
.../camel/component/mqtt/MQTTComponent.java | 5 ++
.../camel/component/mqtt/MQTTBaseTest.java | 4 +-
.../component/mqtt/MQTTConfigurationTest.java | 12 ++++
.../mqtt/MQTTConsumerWildcardTopicsTest.java | 69 ++++++++++++++++++++
4 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
index 7b03a83..6caa305 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java
@@ -54,6 +54,11 @@ public class MQTTComponent extends UriEndpointComponent {
return endpoint;
}
+ @Override
+ public boolean useRawUri() {
+ return true; // to prevent MQTT "+" wildcard from being lost
+ }
+
public String getHost() {
return host;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
index 54bf0d8..f535de2 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
@@ -28,9 +28,11 @@ public abstract class MQTTBaseTest extends CamelTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(MQTTBaseTest.class);
protected static final String TEST_TOPIC = "ComponentTestTopic";
protected static final String TEST_TOPIC_2 = "AnotherTestTopic";
+ protected static final String TEST_WILDCARD_TOPIC = "base/+/#";
protected static final String TEST_TOPICS = TEST_TOPIC + "," + TEST_TOPIC_2;
+ protected static final String TEST_TOPICS_WITH_WILDCARDS = TEST_TOPICS + "," + TEST_WILDCARD_TOPIC;
protected BrokerService brokerService;
- protected int numberOfMessages = 100;
+ protected int numberOfMessages = 10;
public void setUp() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
index e66e6a1..bba2fa3 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
@@ -45,4 +45,16 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS);
assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
}
+
+ @Test
+ public void testWildcardSubscribeTopicsConfiguration() throws Exception {
+ Endpoint endpoint = context.getEndpoint("mqtt:todo?byDefaultRetain=true&qualityOfService=exactlyOnce&publishTopicName=" + TEST_TOPIC + "&subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS);
+ assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
+ MQTTEndpoint mqttEndpoint = (MQTTEndpoint) endpoint;
+
+ assertEquals(mqttEndpoint.getConfiguration().getQoS(), QoS.EXACTLY_ONCE);
+ assertEquals(mqttEndpoint.getConfiguration().getPublishTopicName(), TEST_TOPIC);
+ assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS_WITH_WILDCARDS);
+ assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
new file mode 100644
index 0000000..8337320
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.Test;
+
+public class MQTTConsumerWildcardTopicsTest extends MQTTBaseTest {
+
+ private static final String[] PUBLISH_TOPICS = {
+ TEST_TOPIC,
+ TEST_TOPIC_2,
+ "base", // doesn't match wildcard
+ "base/foo", // matches
+ "base/foo/bar", // matches
+ "base/bat/data/baz/splat" // matches
+ };
+
+ @Test
+ public void testConsumeMultipleTopicsWithWildcards() throws Exception {
+ MQTT mqtt = new MQTT();
+ BlockingConnection publisherConnection = mqtt.blockingConnection();
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(numberOfMessages * (PUBLISH_TOPICS.length - 1));
+
+ publisherConnection.connect();
+ String payload;
+ for (String topic : PUBLISH_TOPICS) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ payload = "Topic " + topic + ", Message " + i;
+ publisherConnection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+ }
+ }
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+ public void configure() {
+ from("mqtt:bar?subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS)
+ .transform(body().convertToString())
+ .to("mock:result");
+ }
+ };
+ }
+}