You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/05 11:12:14 UTC

camel git commit: Fixed mqtt consumer to support async so it works in brdige mode and also fixes a block on windows due different threading model than linux.

Repository: camel
Updated Branches:
  refs/heads/master ef186eea8 -> 2ddbc2230


Fixed mqtt consumer to support async so it works in brdige mode and also fixes a block on windows due different threading model than linux.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ddbc223
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ddbc223
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ddbc223

Branch: refs/heads/master
Commit: 2ddbc2230d856c8a20962f9f16dd777eddcc1c75
Parents: ef186ee
Author: Claus Ibsen <cl...@gmail.com>
Authored: Thu Mar 5 11:13:29 2015 +0100
Committer: Claus Ibsen <cl...@gmail.com>
Committed: Thu Mar 5 11:13:29 2015 +0100

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTConsumer.java       | 19 +++++++++++++++----
 .../camel/component/mqtt/MQTTBrigeTest.java      |  2 ++
 2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2ddbc223/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
index 449a767..0a7c582 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.mqtt;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -44,15 +45,25 @@ public class MQTTConsumer extends DefaultConsumer {
         super.doStop();
     }
 
-    void processExchange(Exchange exchange) {
+    void processExchange(final Exchange exchange) {
+        boolean sync = true;
         try {
-            getProcessor().process(exchange);
+            sync = getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+                    }
+                }
+            });
         } catch (Throwable e) {
             exchange.setException(e);
         }
 
-        if (exchange.getException() != null) {
-            getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+        if (sync) {
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ddbc223/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
index 3732554..6b621b4 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBrigeTest.java
@@ -70,9 +70,11 @@ public class MQTTBrigeTest extends MQTTBaseTest {
             public void configure() {
                 // Bridge message over two MQTT topics
                 from("direct:start").to("mqtt:foo?publishTopicName=test/topic1");
+
                 from("mqtt:foo?subscribeTopicName=test/topic1").to("log:testlogger?showAll=true")
                     .to("mqtt:foo?publishTopicName=test/resulttopic")
                     .log(LoggingLevel.ERROR, "Message processed");
+
                 // Bridge message over two MQTT topics with a seda in between
                 from("direct:startWorkaround").to("mqtt:foo?publishTopicName=test/topic2");
                 from("mqtt:foo?subscribeTopicName=test/topic2").to("log:testlogger?showAll=true")