You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/17 10:38:57 UTC
[4/6] camel git commit: Fixes #635. Create exchange the correct way.
Fixes #635. Create exchange the correct way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e5a51ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e5a51ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e5a51ca
Branch: refs/heads/master
Commit: 9e5a51ca51c495e1038f7bd861372e032cd1dbc9
Parents: dbe9aa5
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:15:40 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200
----------------------------------------------------------------------
.../apache/camel/component/paho/PahoConsumer.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9e5a51ca/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 86dee14..82b6c5f 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.impl.DefaultConsumer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -44,7 +43,7 @@ public class PahoConsumer extends DefaultConsumer {
getEndpoint().getClient().setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
- LOG.debug("MQTT broker connection lost:", cause);
+ LOG.debug("MQTT broker connection lost due " + cause.getMessage(), cause);
}
@Override
@@ -65,15 +64,15 @@ public class PahoConsumer extends DefaultConsumer {
headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
headerValue = props;
}
-
- Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
- withBody(message.getPayload()).
- withHeader(headerKey, headerValue).
- build();
+
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(message.getPayload());
+ exchange.getIn().setHeader(headerKey, headerValue);
+
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
-
+ // noop
}
});
}
@@ -88,6 +87,7 @@ public class PahoConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
+
if (getEndpoint().getClient().isConnected()) {
String topic = getEndpoint().getTopic();
getEndpoint().getClient().unsubscribe(topic);