You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2015/08/28 16:58:05 UTC
[1/2] camel git commit: CAMEL-9092 MQTT consumer receives duplicate
messages after broker restart.
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 7cdb7c1f1 -> ac31039c9
refs/heads/master d1c7f6507 -> 17391a12e
CAMEL-9092 MQTT consumer receives duplicate messages after broker restart.
With thanks to Tomohisa Igarashi. Code merged with modifications.
This closes #601.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/17391a12
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/17391a12
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/17391a12
Branch: refs/heads/master
Commit: 17391a12e9e7a3158058d4e885c6af65141a1338
Parents: d1c7f65
Author: Raul Kripalani <ra...@apache.org>
Authored: Fri Aug 28 15:44:35 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Fri Aug 28 15:51:38 2015 +0100
----------------------------------------------------------------------
.../camel/component/mqtt/MQTTEndpoint.java | 139 +++++++++++++++-
.../component/mqtt/MQTTDuplicatesTest.java | 158 +++++++++++++++++++
.../src/test/resources/log4j.properties | 2 +-
3 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 92c8d17..89caedd 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -39,6 +39,21 @@ import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.Promise;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,12 +72,127 @@ public class MQTTEndpoint extends DefaultEndpoint {
@UriPath @Metadata(required = "true")
private String name;
+
@UriParam
private final MQTTConfiguration configuration;
- public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) {
+ public MQTTEndpoint(final String uri, MQTTComponent component, MQTTConfiguration properties) {
super(uri, component);
this.configuration = properties;
+ if (LOG.isTraceEnabled()) {
+ configuration.setTracer(new Tracer() {
+ @Override
+ public void debug(String message, Object...args) {
+ LOG.trace("tracer.debug() " + this + ": uri=" + uri + ", message=" + String.format(message, args));
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame) {
+ String decoded = null;
+ try {
+ switch (frame.messageType()) {
+ case PINGREQ.TYPE:
+ decoded = new PINGREQ().decode(frame).toString();
+ break;
+ case PINGRESP.TYPE:
+ decoded = new PINGRESP().decode(frame).toString();
+ break;
+ case CONNECT.TYPE:
+ decoded = new CONNECT().decode(frame).toString();
+ break;
+ case DISCONNECT.TYPE:
+ decoded = new DISCONNECT().decode(frame).toString();
+ break;
+ case SUBSCRIBE.TYPE:
+ decoded = new SUBSCRIBE().decode(frame).toString();
+ break;
+ case UNSUBSCRIBE.TYPE:
+ decoded = new UNSUBSCRIBE().decode(frame).toString();
+ break;
+ case PUBLISH.TYPE:
+ decoded = new PUBLISH().decode(frame).toString();
+ break;
+ case PUBACK.TYPE:
+ decoded = new PUBACK().decode(frame).toString();
+ break;
+ case PUBREC.TYPE:
+ decoded = new PUBREC().decode(frame).toString();
+ break;
+ case PUBREL.TYPE:
+ decoded = new PUBREL().decode(frame).toString();
+ break;
+ case PUBCOMP.TYPE:
+ decoded = new PUBCOMP().decode(frame).toString();
+ break;
+ case CONNACK.TYPE:
+ decoded = new CONNACK().decode(frame).toString();
+ break;
+ case SUBACK.TYPE:
+ decoded = new SUBACK().decode(frame).toString();
+ break;
+ default:
+ decoded = frame.toString();
+ }
+ } catch (Throwable e) {
+ decoded = frame.toString();
+ }
+ LOG.trace("tracer.onSend() " + this + ": uri=" + uri + ", frame=" + decoded);
+ }
+
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ String decoded = null;
+ try {
+ switch (frame.messageType()) {
+ case PINGREQ.TYPE:
+ decoded = new PINGREQ().decode(frame).toString();
+ break;
+ case PINGRESP.TYPE:
+ decoded = new PINGRESP().decode(frame).toString();
+ break;
+ case CONNECT.TYPE:
+ decoded = new CONNECT().decode(frame).toString();
+ break;
+ case DISCONNECT.TYPE:
+ decoded = new DISCONNECT().decode(frame).toString();
+ break;
+ case SUBSCRIBE.TYPE:
+ decoded = new SUBSCRIBE().decode(frame).toString();
+ break;
+ case UNSUBSCRIBE.TYPE:
+ decoded = new UNSUBSCRIBE().decode(frame).toString();
+ break;
+ case PUBLISH.TYPE:
+ decoded = new PUBLISH().decode(frame).toString();
+ break;
+ case PUBACK.TYPE:
+ decoded = new PUBACK().decode(frame).toString();
+ break;
+ case PUBREC.TYPE:
+ decoded = new PUBREC().decode(frame).toString();
+ break;
+ case PUBREL.TYPE:
+ decoded = new PUBREL().decode(frame).toString();
+ break;
+ case PUBCOMP.TYPE:
+ decoded = new PUBCOMP().decode(frame).toString();
+ break;
+ case CONNACK.TYPE:
+ decoded = new CONNACK().decode(frame).toString();
+ break;
+ case SUBACK.TYPE:
+ decoded = new SUBACK().decode(frame).toString();
+ break;
+ default:
+ decoded = frame.toString();
+ }
+ } catch (Throwable e) {
+ decoded = frame.toString();
+ }
+ LOG.trace("tracer.onReceive() " + this + ": uri=" + uri + ", frame=" + decoded);
+ }
+ });
+ }
}
@Override
@@ -109,7 +239,11 @@ public class MQTTEndpoint extends DefaultEndpoint {
}
public void onDisconnected() {
- connected = false;
+ // no connected = false required here because the MQTT client should trigger its own reconnect;
+ // setting connected = false would make the publish() method to launch a new connection while the original
+ // one is still reconnecting, likely leading to duplicate messages as observed in CAMEL-9092;
+ // if retries are exhausted and it desists, we should get a callback on onFailure, and then we can set
+ // connected = false safely
LOG.debug("MQTT Connection disconnected from {}", configuration.getHost());
}
@@ -181,6 +315,7 @@ public class MQTTEndpoint extends DefaultEndpoint {
}
public void onFailure(Throwable value) {
+ LOG.debug("Failed to subscribe", value);
promise.onFailure(value);
connection.disconnect(null);
connected = false;
http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
new file mode 100644
index 0000000..c397a1f
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests duplicate delivery via mqtt consumer.
+ *
+ * @version
+ */
+public class MQTTDuplicatesTest extends MQTTBaseTest {
+
+ private static final int MESSAGE_COUNT = 50;
+ private static final int WAIT_MILLIS = 100;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:withClientID")
+ protected ProducerTemplate templateWithClientID;
+
+ @Produce(uri = "direct:withoutClientID")
+ protected ProducerTemplate templateWithoutClientID;
+
+ @Test
+ public void testMqttDuplicates() throws Exception {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy! " + i;
+ templateWithClientID.asyncSendBody("direct:withClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ @Test
+ public void testMqttDuplicatesAfterBrokerRestartWithoutClientID() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ LOG.info(">>>>>>>>>> Restarting broker...");
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ LOG.info(">>>>>>>>>> Broker restarted");
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy-restart-without-clientID! " + i;
+ templateWithoutClientID.asyncSendBody("direct:withoutClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ @Test
+ public void testMqttDuplicatesAfterBrokerRestartWithClientID() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ LOG.info(">>>>>>>>>> Restarting broker...");
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ LOG.info(">>>>>>>>>> Broker restarted");
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy-restart-with-clientID! " + i;
+ templateWithClientID.asyncSendBody("direct:withClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ private void assertNoDuplicates() {
+ List<Exchange> exchanges = resultEndpoint.getExchanges();
+ Assert.assertTrue("No message was delivered - something wrong happened", exchanges.size() > 0);
+ Set<String> values = new HashSet<String>();
+ List<String> duplicates = new ArrayList<String>();
+ for (Exchange e : exchanges) {
+ String body = e.getIn().getBody(String.class);
+ if (values.contains(body)) {
+ duplicates.add(body);
+ }
+ values.add(body);
+ }
+ Assert.assertTrue("Duplicate messages are detected: " + duplicates.toString(), duplicates.isEmpty());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+
+ // --------------------
+ // Without client ID:
+ // --------------------
+ from("direct:withoutClientID")
+ .routeId("SenderWithoutClientID")
+ .log("$$$$$ Sending message: ${body}")
+ .to("mqtt:sender?publishTopicName=test/topic1&qualityOfService=ExactlyOnce");
+
+ from("mqtt:reader?subscribeTopicName=test/topic1&qualityOfService=ExactlyOnce")
+ .routeId("ReceiverWithoutClientID")
+ .log("$$$$$ Received message: ${body}")
+ .to("mock:result");
+
+ // --------------------
+ // With client ID:
+ // --------------------
+ from("direct:withClientID")
+ .routeId("SenderWithClientID")
+ .log("$$$$$ Sending message: ${body}")
+ .to("mqtt:sender?publishTopicName=test/topic2&clientId=sender&qualityOfService=ExactlyOnce");
+
+ from("mqtt:reader?subscribeTopicName=test/topic2&clientId=receiver&qualityOfService=ExactlyOnce")
+ .routeId("ReceiverWithClientID")
+ .log("$$$$$ Received message: ${body}")
+ .to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/17391a12/components/camel-mqtt/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/resources/log4j.properties b/components/camel-mqtt/src/test/resources/log4j.properties
index b15a0cd..d7962ca 100644
--- a/components/camel-mqtt/src/test/resources/log4j.properties
+++ b/components/camel-mqtt/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
log4j.rootLogger=info, file
#log4j.logger.twitter4j=DEBUG
-#log4j.logger.org.apache.camel.component.mqtt=DEBUG
+#log4j.logger.org.apache.camel.component.mqtt=TRACE
#log4j.logger.org.apache.camel=DEBUG
# CONSOLE appender not used by default
[2/2] camel git commit: CAMEL-9092 MQTT consumer receives duplicate
messages after broker restart.
Posted by ra...@apache.org.
CAMEL-9092 MQTT consumer receives duplicate messages after broker restart.
With thanks to Tomohisa Igarashi. Code merged with modifications.
This closes #601.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ac31039c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ac31039c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ac31039c
Branch: refs/heads/camel-2.15.x
Commit: ac31039c9d26e567698bf70aa8e727fed0cbfc42
Parents: 7cdb7c1
Author: Raul Kripalani <ra...@apache.org>
Authored: Fri Aug 28 15:44:35 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Fri Aug 28 15:57:39 2015 +0100
----------------------------------------------------------------------
.../camel/component/mqtt/MQTTEndpoint.java | 139 +++++++++++++++-
.../component/mqtt/MQTTDuplicatesTest.java | 158 +++++++++++++++++++
.../src/test/resources/log4j.properties | 2 +-
3 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index cfd2eb9..94b3008 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -39,6 +39,21 @@ import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.Promise;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,14 +69,129 @@ public class MQTTEndpoint extends DefaultEndpoint {
private CallbackConnection connection;
@UriPath @Metadata(required = "true")
private String name;
+
@UriParam
private final MQTTConfiguration configuration;
private volatile boolean connected;
private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>();
- public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) {
+ public MQTTEndpoint(final String uri, MQTTComponent component, MQTTConfiguration properties) {
super(uri, component);
this.configuration = properties;
+ if (LOG.isTraceEnabled()) {
+ configuration.setTracer(new Tracer() {
+ @Override
+ public void debug(String message, Object...args) {
+ LOG.trace("tracer.debug() " + this + ": uri=" + uri + ", message=" + String.format(message, args));
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame) {
+ String decoded = null;
+ try {
+ switch (frame.messageType()) {
+ case PINGREQ.TYPE:
+ decoded = new PINGREQ().decode(frame).toString();
+ break;
+ case PINGRESP.TYPE:
+ decoded = new PINGRESP().decode(frame).toString();
+ break;
+ case CONNECT.TYPE:
+ decoded = new CONNECT().decode(frame).toString();
+ break;
+ case DISCONNECT.TYPE:
+ decoded = new DISCONNECT().decode(frame).toString();
+ break;
+ case SUBSCRIBE.TYPE:
+ decoded = new SUBSCRIBE().decode(frame).toString();
+ break;
+ case UNSUBSCRIBE.TYPE:
+ decoded = new UNSUBSCRIBE().decode(frame).toString();
+ break;
+ case PUBLISH.TYPE:
+ decoded = new PUBLISH().decode(frame).toString();
+ break;
+ case PUBACK.TYPE:
+ decoded = new PUBACK().decode(frame).toString();
+ break;
+ case PUBREC.TYPE:
+ decoded = new PUBREC().decode(frame).toString();
+ break;
+ case PUBREL.TYPE:
+ decoded = new PUBREL().decode(frame).toString();
+ break;
+ case PUBCOMP.TYPE:
+ decoded = new PUBCOMP().decode(frame).toString();
+ break;
+ case CONNACK.TYPE:
+ decoded = new CONNACK().decode(frame).toString();
+ break;
+ case SUBACK.TYPE:
+ decoded = new SUBACK().decode(frame).toString();
+ break;
+ default:
+ decoded = frame.toString();
+ }
+ } catch (Throwable e) {
+ decoded = frame.toString();
+ }
+ LOG.trace("tracer.onSend() " + this + ": uri=" + uri + ", frame=" + decoded);
+ }
+
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ String decoded = null;
+ try {
+ switch (frame.messageType()) {
+ case PINGREQ.TYPE:
+ decoded = new PINGREQ().decode(frame).toString();
+ break;
+ case PINGRESP.TYPE:
+ decoded = new PINGRESP().decode(frame).toString();
+ break;
+ case CONNECT.TYPE:
+ decoded = new CONNECT().decode(frame).toString();
+ break;
+ case DISCONNECT.TYPE:
+ decoded = new DISCONNECT().decode(frame).toString();
+ break;
+ case SUBSCRIBE.TYPE:
+ decoded = new SUBSCRIBE().decode(frame).toString();
+ break;
+ case UNSUBSCRIBE.TYPE:
+ decoded = new UNSUBSCRIBE().decode(frame).toString();
+ break;
+ case PUBLISH.TYPE:
+ decoded = new PUBLISH().decode(frame).toString();
+ break;
+ case PUBACK.TYPE:
+ decoded = new PUBACK().decode(frame).toString();
+ break;
+ case PUBREC.TYPE:
+ decoded = new PUBREC().decode(frame).toString();
+ break;
+ case PUBREL.TYPE:
+ decoded = new PUBREL().decode(frame).toString();
+ break;
+ case PUBCOMP.TYPE:
+ decoded = new PUBCOMP().decode(frame).toString();
+ break;
+ case CONNACK.TYPE:
+ decoded = new CONNACK().decode(frame).toString();
+ break;
+ case SUBACK.TYPE:
+ decoded = new SUBACK().decode(frame).toString();
+ break;
+ default:
+ decoded = frame.toString();
+ }
+ } catch (Throwable e) {
+ decoded = frame.toString();
+ }
+ LOG.trace("tracer.onReceive() " + this + ": uri=" + uri + ", frame=" + decoded);
+ }
+ });
+ }
}
@Override
@@ -105,7 +235,11 @@ public class MQTTEndpoint extends DefaultEndpoint {
}
public void onDisconnected() {
- connected = false;
+ // no connected = false required here because the MQTT client should trigger its own reconnect;
+ // setting connected = false would make the publish() method to launch a new connection while the original
+ // one is still reconnecting, likely leading to duplicate messages as observed in CAMEL-9092;
+ // if retries are exhausted and it desists, we should get a callback on onFailure, and then we can set
+ // connected = false safely
LOG.debug("MQTT Connection disconnected from {}", configuration.getHost());
}
@@ -177,6 +311,7 @@ public class MQTTEndpoint extends DefaultEndpoint {
}
public void onFailure(Throwable value) {
+ LOG.debug("Failed to subscribe", value);
promise.onFailure(value);
connection.disconnect(null);
connected = false;
http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
new file mode 100644
index 0000000..c397a1f
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests duplicate delivery via mqtt consumer.
+ *
+ * @version
+ */
+public class MQTTDuplicatesTest extends MQTTBaseTest {
+
+ private static final int MESSAGE_COUNT = 50;
+ private static final int WAIT_MILLIS = 100;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:withClientID")
+ protected ProducerTemplate templateWithClientID;
+
+ @Produce(uri = "direct:withoutClientID")
+ protected ProducerTemplate templateWithoutClientID;
+
+ @Test
+ public void testMqttDuplicates() throws Exception {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy! " + i;
+ templateWithClientID.asyncSendBody("direct:withClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ @Test
+ public void testMqttDuplicatesAfterBrokerRestartWithoutClientID() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ LOG.info(">>>>>>>>>> Restarting broker...");
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ LOG.info(">>>>>>>>>> Broker restarted");
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy-restart-without-clientID! " + i;
+ templateWithoutClientID.asyncSendBody("direct:withoutClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ @Test
+ public void testMqttDuplicatesAfterBrokerRestartWithClientID() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ LOG.info(">>>>>>>>>> Restarting broker...");
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ LOG.info(">>>>>>>>>> Broker restarted");
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String body = System.currentTimeMillis() + ": Dummy-restart-with-clientID! " + i;
+ templateWithClientID.asyncSendBody("direct:withClientID", body);
+ Thread.sleep(WAIT_MILLIS);
+ }
+
+ assertNoDuplicates();
+ }
+
+ private void assertNoDuplicates() {
+ List<Exchange> exchanges = resultEndpoint.getExchanges();
+ Assert.assertTrue("No message was delivered - something wrong happened", exchanges.size() > 0);
+ Set<String> values = new HashSet<String>();
+ List<String> duplicates = new ArrayList<String>();
+ for (Exchange e : exchanges) {
+ String body = e.getIn().getBody(String.class);
+ if (values.contains(body)) {
+ duplicates.add(body);
+ }
+ values.add(body);
+ }
+ Assert.assertTrue("Duplicate messages are detected: " + duplicates.toString(), duplicates.isEmpty());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+
+ // --------------------
+ // Without client ID:
+ // --------------------
+ from("direct:withoutClientID")
+ .routeId("SenderWithoutClientID")
+ .log("$$$$$ Sending message: ${body}")
+ .to("mqtt:sender?publishTopicName=test/topic1&qualityOfService=ExactlyOnce");
+
+ from("mqtt:reader?subscribeTopicName=test/topic1&qualityOfService=ExactlyOnce")
+ .routeId("ReceiverWithoutClientID")
+ .log("$$$$$ Received message: ${body}")
+ .to("mock:result");
+
+ // --------------------
+ // With client ID:
+ // --------------------
+ from("direct:withClientID")
+ .routeId("SenderWithClientID")
+ .log("$$$$$ Sending message: ${body}")
+ .to("mqtt:sender?publishTopicName=test/topic2&clientId=sender&qualityOfService=ExactlyOnce");
+
+ from("mqtt:reader?subscribeTopicName=test/topic2&clientId=receiver&qualityOfService=ExactlyOnce")
+ .routeId("ReceiverWithClientID")
+ .log("$$$$$ Received message: ${body}")
+ .to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/resources/log4j.properties b/components/camel-mqtt/src/test/resources/log4j.properties
index b15a0cd..d7962ca 100644
--- a/components/camel-mqtt/src/test/resources/log4j.properties
+++ b/components/camel-mqtt/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
log4j.rootLogger=info, file
#log4j.logger.twitter4j=DEBUG
-#log4j.logger.org.apache.camel.component.mqtt=DEBUG
+#log4j.logger.org.apache.camel.component.mqtt=TRACE
#log4j.logger.org.apache.camel=DEBUG
# CONSOLE appender not used by default