You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/05 11:12:14 UTC
camel git commit: Fixed mqtt consumer to support async so it works in
brdige mode and also fixes a block on windows due different threading model
than linux.
Repository: camel
Updated Branches:
refs/heads/master ef186eea8 -> 2ddbc2230
Fixed mqtt consumer to support async so it works in brdige mode and also fixes a block on windows due different threading model than linux.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ddbc223
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ddbc223
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ddbc223
Branch: refs/heads/master
Commit: 2ddbc2230d856c8a20962f9f16dd777eddcc1c75
Parents: ef186ee
Author: Claus Ibsen <cl...@gmail.com>
Authored: Thu Mar 5 11:13:29 2015 +0100
Committer: Claus Ibsen <cl...@gmail.com>
Committed: Thu Mar 5 11:13:29 2015 +0100
----------------------------------------------------------------------
.../camel/component/mqtt/MQTTConsumer.java | 19 +++++++++++++++----
.../camel/component/mqtt/MQTTBrigeTest.java | 2 ++
2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2ddbc223/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
index 449a767..0a7c582 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.mqtt;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -44,15 +45,25 @@ public class MQTTConsumer extends DefaultConsumer {
super.doStop();
}
- void processExchange(Exchange exchange) {
+ void processExchange(final Exchange exchange) {
+ boolean sync = true;
try {
- getProcessor().process(exchange);
+ sync = getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+ }
+ }
+ });
} catch (Throwable e) {
exchange.setException(e);
}
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+ if (sync) {
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ddbc223/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
index 3732554..6b621b4 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
@@ -70,9 +70,11 @@ public class MQTTBrigeTest extends MQTTBaseTest {
public void configure() {
// Bridge message over two MQTT topics
from("direct:start").to("mqtt:foo?publishTopicName=test/topic1");
+
from("mqtt:foo?subscribeTopicName=test/topic1").to("log:testlogger?showAll=true")
.to("mqtt:foo?publishTopicName=test/resulttopic")
.log(LoggingLevel.ERROR, "Message processed");
+
// Bridge message over two MQTT topics with a seda in between
from("direct:startWorkaround").to("mqtt:foo?publishTopicName=test/topic2");
from("mqtt:foo?subscribeTopicName=test/topic2").to("log:testlogger?showAll=true")